Browse Source

[Improvement][API][num-8] add ProcessTaskRelationServiceImpl.createProcessTaskRelation (#6970)

* add ProcessTaskRelationServiceImpl.createProcessTaskRelation

* code style
3.0.0/version-upgrade
JinYong Li 3 years ago committed by GitHub
parent
commit
e51a2a1642
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
  2. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  3. 102
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  4. 68
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
  5. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
  6. 22
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  7. 10
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
  8. 29
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_PROCESS_TASK_RELATION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_PROCESS_RELATION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.MOVE_PROCESS_TASK_RELATION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_PROCESS_RELATION_ERROR;
@ -29,6 +30,9 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
@ -85,7 +89,15 @@ public class ProcessTaskRelationController extends BaseController {
@RequestParam(name = "processDefinitionCode", required = true) long processDefinitionCode,
@RequestParam(name = "preTaskCode", required = true) long preTaskCode,
@RequestParam(name = "postTaskCode", required = true) long postTaskCode) {
return returnDataList(processTaskRelationService.createProcessTaskRelation(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode));
Map<String, Object> result = new HashMap<>();
if (postTaskCode == 0L) {
putMsg(result, DATA_IS_NOT_VALID, "postTaskCode");
} else if (processDefinitionCode == 0L) {
putMsg(result, DATA_IS_NOT_VALID, "processDefinitionCode");
} else {
result = processTaskRelationService.createProcessTaskRelation(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode);
}
return returnDataList(result);
}
/**

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -287,6 +287,7 @@ public enum Status {
TASK_DEFINE_STATE_ONLINE(50050, "task definition {0} is already on line", "任务定义[{0}]已上线"),
TASK_HAS_DOWNSTREAM(50051, "Task [{0}] exists downstream dependence", "任务[{0}]存在下游依赖"),
MAIN_TABLE_USING_VERSION(50052, "the version that the master table is using", "主表正在使用该版本"),
PROJECT_PROCESS_NOT_MATCH(50053, "the project and the process is not match", "项目和工作流不匹配"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/**

102
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -18,32 +18,37 @@
package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* process task relation service impl
@ -51,8 +56,6 @@ import org.springframework.stereotype.Service;
@Service
public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements ProcessTaskRelationService {
private static final Logger logger = LoggerFactory.getLogger(ProcessTaskRelationServiceImpl.class);
@Autowired
private ProjectMapper projectMapper;
@ -62,11 +65,18 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired
private UserMapper userMapper;
private TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
/**
* create process task relation
*
@ -77,9 +87,87 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
* @param postTaskCode postTaskCode
* @return create result code
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> createProcessTaskRelation(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode) {
return null;
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
return result;
}
if (processDefinition.getProjectCode() != projectCode) {
putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH);
return result;
}
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, postTaskCode);
if (!processTaskRelations.isEmpty()) {
Map<Long, ProcessTaskRelation> preTaskCodeMap = processTaskRelations.stream()
.collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelation -> processTaskRelation));
if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) {
putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode);
return result;
}
if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(preTaskCodeMap.get(0L));
// delete no upstream
int delete = processTaskRelationMapper.deleteRelation(processTaskRelationLog);
int deleteLog = processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
if ((delete & deleteLog) == 0) {
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
}
}
Date now = new Date();
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>();
if (preTaskCode != 0L) {
// upstream is or not exist
List<ProcessTaskRelation> upstreamProcessTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, preTaskCode);
TaskDefinition preTaskDefinition = taskDefinitionMapper.queryByCode(preTaskCode);
if (upstreamProcessTaskRelations.isEmpty()) {
ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), preTaskDefinition);
processTaskRelationLog.setPreTaskCode(0L);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLogs.add(processTaskRelationLog);
}
TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode);
ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition);
processTaskRelationLog.setPreTaskCode(preTaskDefinition.getCode());
processTaskRelationLog.setPreTaskVersion(preTaskDefinition.getVersion());
processTaskRelationLogs.add(processTaskRelationLog);
} else {
TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode);
ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition);
processTaskRelationLog.setPreTaskCode(0L);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLogs.add(processTaskRelationLog);
}
int insert = processTaskRelationMapper.batchInsert(processTaskRelationLogs);
int insertLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogs);
if ((insert & insertLog) > 0) {
putMsg(result, Status.SUCCESS);
}
return result;
}
private ProcessTaskRelationLog setRelationLog(ProcessDefinition processDefinition, Date now, int userId, TaskDefinition taskDefinition) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(processDefinition.getProjectCode());
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelationLog.setPostTaskCode(taskDefinition.getCode());
processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion());
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLog.setOperator(userId);
processTaskRelationLog.setOperateTime(now);
return processTaskRelationLog;
}
/**

68
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java

@ -22,14 +22,19 @@ import org.apache.dolphinscheduler.api.service.impl.ProcessTaskRelationServiceIm
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.commons.collections.CollectionUtils;
@ -50,6 +55,8 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import com.google.common.collect.Lists;
/**
* process task instance relation service test
*/
@ -71,6 +78,15 @@ public class ProcessTaskRelationServiceTest {
@Mock
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
@Mock
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
/**
* get Mock Admin User
*
@ -210,6 +226,58 @@ public class ProcessTaskRelationServiceTest {
return processTaskRelationList;
}
private ProcessDefinition getProcessDefinition() {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(1);
processDefinition.setProjectCode(1L);
processDefinition.setName("test_pdf");
processDefinition.setTenantId(1);
processDefinition.setDescription("");
processDefinition.setCode(1L);
return processDefinition;
}
private TaskDefinition getTaskDefinition() {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(1L);
taskDefinition.setCode(1L);
taskDefinition.setVersion(1);
return taskDefinition;
}
@Test
public void testCreateProcessTaskRelation() {
long projectCode = 1L;
long processDefinitionCode = 1L;
long preTaskCode = 0L;
long postTaskCode = 1L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition());
Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(Lists.newArrayList());
Mockito.when(taskDefinitionMapper.queryByCode(postTaskCode)).thenReturn(getTaskDefinition());
List<ProcessTaskRelationLog> processTaskRelationList = Lists.newArrayList();
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(projectCode);
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
processTaskRelationLog.setPreTaskCode(0L);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLog.setPostTaskCode(postTaskCode);
processTaskRelationLog.setPostTaskVersion(1);
processTaskRelationList.add(processTaskRelationLog);
Mockito.when(processTaskRelationMapper.batchInsert(processTaskRelationList)).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.batchInsert(processTaskRelationList)).thenReturn(1);
}
@Test
public void testQueryDownstreamRelation() {
long projectCode = 1L;

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java

@ -57,4 +57,12 @@ public interface ProcessTaskRelationLogMapper extends BaseMapper<ProcessTaskRela
*/
int deleteByCode(@Param("processCode") long processCode,
@Param("processVersion") int processVersion);
/**
* delete process task relation
*
* @param processTaskRelationLog processTaskRelationLog
* @return int
*/
int deleteRelation(@Param("processTaskRelationLog") ProcessTaskRelationLog processTaskRelationLog);
}

22
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -100,4 +100,26 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode);
/**
* query by code
*
* @param projectCode projectCode
* @param processDefinitionCode processDefinitionCode
* @param preTaskCode preTaskCode
* @param postTaskCode postTaskCode
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryByCode(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode,
@Param("preTaskCode") long preTaskCode,
@Param("postTaskCode") long postTaskCode);
/**
* delete process task relation
*
* @param processTaskRelationLog processTaskRelationLog
* @return int
*/
int deleteRelation(@Param("processTaskRelationLog") ProcessTaskRelationLog processTaskRelationLog);
}

10
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml

@ -46,4 +46,14 @@
WHERE process_definition_code = #{processCode}
and process_definition_version = #{processVersion}
</delete>
<delete id="deleteRelation">
delete from t_ds_process_task_relation_log
WHERE project_code = #{processTaskRelationLog.projectCode}
and process_definition_code = #{processTaskRelationLog.processDefinitionCode}
and process_definition_version = #{processTaskRelationLog.processDefinitionVersion}
and pre_task_code = #{processTaskRelationLog.preTaskCode}
and pre_task_version = #{processTaskRelationLog.preTaskVersion}
and post_task_code = #{processTaskRelationLog.postTaskCode}
and post_task_version = #{processTaskRelationLog.post_task_version}
</delete>
</mapper>

29
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -92,4 +92,33 @@
and post_task_code = #{taskCode}
</select>
<select id="queryByCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
from t_ds_process_task_relation
WHERE 1=1
<if test="projectCode != 0">
and project_code = #{projectCode}
</if>
<if test="processDefinitionCode != 0">
and process_definition_code = #{processDefinitionCode}
</if>
<if test="preTaskCode != 0">
and pre_task_code = #{preTaskCode}
</if>
<if test="postTaskCode != 0">
and post_task_code = #{postTaskCode}
</if>
</select>
<delete id="deleteRelation">
delete from t_ds_process_task_relation
WHERE project_code = #{processTaskRelationLog.projectCode}
and process_definition_code = #{processTaskRelationLog.processDefinitionCode}
and process_definition_version = #{processTaskRelationLog.processDefinitionVersion}
and pre_task_code = #{processTaskRelationLog.preTaskCode}
and pre_task_version = #{processTaskRelationLog.preTaskVersion}
and post_task_code = #{processTaskRelationLog.postTaskCode}
and post_task_version = #{processTaskRelationLog.post_task_version}
</delete>
</mapper>

Loading…
Cancel
Save