diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java index 562661fe8e..5c1e3f427b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.controller; import static org.apache.dolphinscheduler.api.enums.Status.CREATE_PROCESS_TASK_RELATION_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID; import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_PROCESS_RELATION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.MOVE_PROCESS_TASK_RELATION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_PROCESS_RELATION_ERROR; @@ -29,6 +30,9 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.dao.entity.User; +import java.util.HashMap; +import java.util.Map; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.DeleteMapping; @@ -85,7 +89,15 @@ public class ProcessTaskRelationController extends BaseController { @RequestParam(name = "processDefinitionCode", required = true) long processDefinitionCode, @RequestParam(name = "preTaskCode", required = true) long preTaskCode, @RequestParam(name = "postTaskCode", required = true) long postTaskCode) { - return returnDataList(processTaskRelationService.createProcessTaskRelation(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode)); + Map result = new HashMap<>(); + if (postTaskCode == 0L) { + putMsg(result, DATA_IS_NOT_VALID, "postTaskCode"); + } else if (processDefinitionCode == 0L) { + putMsg(result, DATA_IS_NOT_VALID, "processDefinitionCode"); + } else { + result = processTaskRelationService.createProcessTaskRelation(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode); + } + return returnDataList(result); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 625c9134ec..506e6a6558 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -287,6 +287,7 @@ public enum Status { TASK_DEFINE_STATE_ONLINE(50050, "task definition {0} is already on line", "任务定义[{0}]已上线"), TASK_HAS_DOWNSTREAM(50051, "Task [{0}] exists downstream dependence", "任务[{0}]存在下游依赖"), MAIN_TABLE_USING_VERSION(50052, "the version that the master table is using", "主表正在使用该版本"), + PROJECT_PROCESS_NOT_MATCH(50053, "the project and the process is not match", "项目和工作流不匹配"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index 4d99de75b7..3d6e4a25c7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -18,32 +18,37 @@ package org.apache.dolphinscheduler.api.service.impl; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; -import org.apache.dolphinscheduler.dao.mapper.UserMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.commons.collections.CollectionUtils; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; /** * process task relation service impl @@ -51,8 +56,6 @@ import org.springframework.stereotype.Service; @Service public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements ProcessTaskRelationService { - private static final Logger logger = LoggerFactory.getLogger(ProcessTaskRelationServiceImpl.class); - @Autowired private ProjectMapper projectMapper; @@ -62,11 +65,18 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P @Autowired private ProcessTaskRelationMapper processTaskRelationMapper; + @Autowired + private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + @Autowired private TaskDefinitionLogMapper taskDefinitionLogMapper; @Autowired - private UserMapper userMapper; + private TaskDefinitionMapper taskDefinitionMapper; + + @Autowired + private ProcessDefinitionMapper processDefinitionMapper; + /** * create process task relation * @@ -77,9 +87,87 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P * @param postTaskCode postTaskCode * @return create result code */ + @Transactional(rollbackFor = RuntimeException.class) @Override public Map createProcessTaskRelation(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode) { - return null; + Project project = projectMapper.queryByCode(projectCode); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode); + return result; + } + if (processDefinition.getProjectCode() != projectCode) { + putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH); + return result; + } + List processTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, postTaskCode); + if (!processTaskRelations.isEmpty()) { + Map preTaskCodeMap = processTaskRelations.stream() + .collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelation -> processTaskRelation)); + if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) { + putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode); + return result; + } + if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) { + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(preTaskCodeMap.get(0L)); + // delete no upstream + int delete = processTaskRelationMapper.deleteRelation(processTaskRelationLog); + int deleteLog = processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); + if ((delete & deleteLog) == 0) { + putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + } + } + } + Date now = new Date(); + List processTaskRelationLogs = new ArrayList<>(); + if (preTaskCode != 0L) { + // upstream is or not exist + List upstreamProcessTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, preTaskCode); + TaskDefinition preTaskDefinition = taskDefinitionMapper.queryByCode(preTaskCode); + if (upstreamProcessTaskRelations.isEmpty()) { + ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), preTaskDefinition); + processTaskRelationLog.setPreTaskCode(0L); + processTaskRelationLog.setPreTaskVersion(0); + processTaskRelationLogs.add(processTaskRelationLog); + } + TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode); + ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition); + processTaskRelationLog.setPreTaskCode(preTaskDefinition.getCode()); + processTaskRelationLog.setPreTaskVersion(preTaskDefinition.getVersion()); + processTaskRelationLogs.add(processTaskRelationLog); + } else { + TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode); + ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition); + processTaskRelationLog.setPreTaskCode(0L); + processTaskRelationLog.setPreTaskVersion(0); + processTaskRelationLogs.add(processTaskRelationLog); + } + int insert = processTaskRelationMapper.batchInsert(processTaskRelationLogs); + int insertLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogs); + if ((insert & insertLog) > 0) { + putMsg(result, Status.SUCCESS); + } + return result; + } + + private ProcessTaskRelationLog setRelationLog(ProcessDefinition processDefinition, Date now, int userId, TaskDefinition taskDefinition) { + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.setProjectCode(processDefinition.getProjectCode()); + processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode()); + processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion()); + processTaskRelationLog.setPostTaskCode(taskDefinition.getCode()); + processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion()); + processTaskRelationLog.setCreateTime(now); + processTaskRelationLog.setUpdateTime(now); + processTaskRelationLog.setOperator(userId); + processTaskRelationLog.setOperateTime(now); + return processTaskRelationLog; } /** diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java index aaccbd0c79..758eceb4e4 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java @@ -22,14 +22,19 @@ import org.apache.dolphinscheduler.api.service.impl.ProcessTaskRelationServiceIm import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.commons.collections.CollectionUtils; @@ -50,6 +55,8 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import com.google.common.collect.Lists; + /** * process task instance relation service test */ @@ -71,6 +78,15 @@ public class ProcessTaskRelationServiceTest { @Mock private TaskDefinitionLogMapper taskDefinitionLogMapper; + @Mock + private ProcessDefinitionMapper processDefinitionMapper; + + @Mock + private TaskDefinitionMapper taskDefinitionMapper; + + @Mock + private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + /** * get Mock Admin User * @@ -210,6 +226,58 @@ public class ProcessTaskRelationServiceTest { return processTaskRelationList; } + private ProcessDefinition getProcessDefinition() { + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setId(1); + processDefinition.setProjectCode(1L); + processDefinition.setName("test_pdf"); + processDefinition.setTenantId(1); + processDefinition.setDescription(""); + processDefinition.setCode(1L); + return processDefinition; + } + + private TaskDefinition getTaskDefinition() { + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setProjectCode(1L); + taskDefinition.setCode(1L); + taskDefinition.setVersion(1); + return taskDefinition; + } + + @Test + public void testCreateProcessTaskRelation() { + long projectCode = 1L; + long processDefinitionCode = 1L; + long preTaskCode = 0L; + long postTaskCode = 1L; + + Project project = getProject(projectCode); + Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); + + User loginUser = new User(); + loginUser.setId(-1); + loginUser.setUserType(UserType.GENERAL_USER); + + Map result = new HashMap<>(); + putMsg(result, Status.SUCCESS, projectCode); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); + Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition()); + Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(Lists.newArrayList()); + Mockito.when(taskDefinitionMapper.queryByCode(postTaskCode)).thenReturn(getTaskDefinition()); + List processTaskRelationList = Lists.newArrayList(); + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.setProjectCode(projectCode); + processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); + processTaskRelationLog.setPreTaskCode(0L); + processTaskRelationLog.setPreTaskVersion(0); + processTaskRelationLog.setPostTaskCode(postTaskCode); + processTaskRelationLog.setPostTaskVersion(1); + processTaskRelationList.add(processTaskRelationLog); + Mockito.when(processTaskRelationMapper.batchInsert(processTaskRelationList)).thenReturn(1); + Mockito.when(processTaskRelationLogMapper.batchInsert(processTaskRelationList)).thenReturn(1); + } + @Test public void testQueryDownstreamRelation() { long projectCode = 1L; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java index 472018baf6..b086377365 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java @@ -57,4 +57,12 @@ public interface ProcessTaskRelationLogMapper extends BaseMapper queryDownstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode); + + /** + * query by code + * + * @param projectCode projectCode + * @param processDefinitionCode processDefinitionCode + * @param preTaskCode preTaskCode + * @param postTaskCode postTaskCode + * @return ProcessTaskRelation + */ + List queryByCode(@Param("projectCode") long projectCode, + @Param("processDefinitionCode") long processDefinitionCode, + @Param("preTaskCode") long preTaskCode, + @Param("postTaskCode") long postTaskCode); + + /** + * delete process task relation + * + * @param processTaskRelationLog processTaskRelationLog + * @return int + */ + int deleteRelation(@Param("processTaskRelationLog") ProcessTaskRelationLog processTaskRelationLog); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml index 2eafb31a84..c056dbfa07 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml @@ -46,4 +46,14 @@ WHERE process_definition_code = #{processCode} and process_definition_version = #{processVersion} + + delete from t_ds_process_task_relation_log + WHERE project_code = #{processTaskRelationLog.projectCode} + and process_definition_code = #{processTaskRelationLog.processDefinitionCode} + and process_definition_version = #{processTaskRelationLog.processDefinitionVersion} + and pre_task_code = #{processTaskRelationLog.preTaskCode} + and pre_task_version = #{processTaskRelationLog.preTaskVersion} + and post_task_code = #{processTaskRelationLog.postTaskCode} + and post_task_version = #{processTaskRelationLog.post_task_version} + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml index d19dec640d..11602eba4c 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml @@ -92,4 +92,33 @@ and post_task_code = #{taskCode} + + + + delete from t_ds_process_task_relation + WHERE project_code = #{processTaskRelationLog.projectCode} + and process_definition_code = #{processTaskRelationLog.processDefinitionCode} + and process_definition_version = #{processTaskRelationLog.processDefinitionVersion} + and pre_task_code = #{processTaskRelationLog.preTaskCode} + and pre_task_version = #{processTaskRelationLog.preTaskVersion} + and post_task_code = #{processTaskRelationLog.postTaskCode} + and post_task_version = #{processTaskRelationLog.post_task_version} +