diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java index db75a43a48..8b00eefd81 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java @@ -21,11 +21,15 @@ import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.api.service.impl.WorkFlowLineageServiceImpl; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.entity.ProcessLineage; +import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -36,6 +40,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; /** @@ -50,28 +55,75 @@ public class WorkFlowLineageServiceTest { @Mock private WorkFlowLineageMapper workFlowLineageMapper; + @Mock + private ProjectMapper projectMapper; + + /** + * get mock Project + * + * @param projectName projectName + * @return Project + */ + private Project getProject(String projectName) { + Project project = new Project(); + project.setCode(1L); + project.setId(1); + project.setName(projectName); + project.setUserId(1); + return project; + } + @Test public void testQueryWorkFlowLineageByName() { + Project project = getProject("test"); String searchVal = "test"; - when(workFlowLineageMapper.queryByName(searchVal, 1L)).thenReturn(getWorkFlowLineages()); - Map result = workFlowLineageService.queryWorkFlowLineageByName(searchVal,1); - List workFlowLineageList = (List)result.get(Constants.DATA_LIST); - Assert.assertTrue(workFlowLineageList.size()>0); + when(projectMapper.selectById(1)).thenReturn(project); + when(workFlowLineageMapper.queryByName(Mockito.any(), Mockito.any())).thenReturn(getWorkFlowLineages()); + Map result = workFlowLineageService.queryWorkFlowLineageByName(searchVal, 1); + List workFlowLineageList = (List) result.get(Constants.DATA_LIST); + Assert.assertTrue(workFlowLineageList.size() > 0); } @Test public void testQueryWorkFlowLineageByIds() { - Set ids = new HashSet<>(); ids.add(1); ids.add(2); - Map result = workFlowLineageService.queryWorkFlowLineageByIds(ids,1); - Map workFlowLists = (Map)result.get(Constants.DATA_LIST); - List workFlowLineages = (List)workFlowLists.get("workFlowList"); - List workFlowRelations = (List)workFlowLists.get("workFlowRelationList"); - Assert.assertTrue(workFlowLineages.size()>0); - Assert.assertTrue(workFlowRelations.size()>0); + Project project = getProject("test"); + + List processLineages = new ArrayList<>(); + ProcessLineage processLineage = new ProcessLineage(); + processLineage.setPreTaskVersion(1); + processLineage.setPreTaskCode(1L); + processLineage.setPostTaskCode(2L); + processLineage.setPostTaskVersion(1); + processLineage.setProcessDefinitionCode(1111L); + processLineage.setProcessDefinitionVersion(1); + processLineage.setProjectCode(1111L); + processLineages.add(processLineage); + + WorkFlowLineage workFlowLineage = new WorkFlowLineage(); + workFlowLineage.setSourceWorkFlowId(""); + + when(projectMapper.selectById(1)).thenReturn(project); + when(workFlowLineageMapper.queryRelationByIds(ids, project.getCode())).thenReturn(processLineages); + when(workFlowLineageMapper.queryCodeRelation(processLineage.getPostTaskCode() + , processLineage.getPreTaskVersion() + , processLineage.getProcessDefinitionCode() + , processLineage.getProjectCode())) + .thenReturn(processLineages); + when(workFlowLineageMapper + .queryWorkFlowLineageByCode(processLineage.getProcessDefinitionCode(), processLineage.getProjectCode())) + .thenReturn(workFlowLineage); + + Map result = workFlowLineageService.queryWorkFlowLineageByIds(ids, 1); + + Map workFlowLists = (Map) result.get(Constants.DATA_LIST); + Collection workFlowLineages = (Collection) workFlowLists.get(Constants.WORKFLOW_LIST); + Set workFlowRelations = (Set) workFlowLists.get(Constants.WORKFLOW_RELATION_LIST); + Assert.assertTrue(workFlowLineages.size() > 0); + Assert.assertTrue(workFlowRelations.size() > 0); } private List getWorkFlowLineages() { @@ -83,13 +135,4 @@ public class WorkFlowLineageServiceTest { return workFlowLineages; } - private List getWorkFlowRelation(){ - List workFlowRelations = new ArrayList<>(); - WorkFlowRelation workFlowRelation = new WorkFlowRelation(); - workFlowRelation.setSourceWorkFlowId(1); - workFlowRelation.setTargetWorkFlowId(2); - workFlowRelations.add(workFlowRelation); - return workFlowRelations; - } - } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionVersion.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionVersion.java deleted file mode 100644 index 1c4d979415..0000000000 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionVersion.java +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.dao.entity; - -import java.util.Date; - -import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableField; -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; -import com.fasterxml.jackson.annotation.JsonFormat; - - -/** - * process definition version - */ -@TableName("t_ds_process_definition_version") -public class ProcessDefinitionVersion { - - /** - * id - */ - @TableId(value = "id", type = IdType.AUTO) - private int id; - - /** - * process definition id - */ - private int processDefinitionId; - - /** - * version - */ - private long version; - - /** - * definition json string - */ - private String processDefinitionJson; - - /** - * description - */ - private String description; - - /** - * process warning time out. unit: minute - */ - private int timeout; - - /** - * resource ids - */ - private String resourceIds; - - /** - * create time - */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") - private Date createTime; - - /** - * user defined parameters - */ - private String globalParams; - - /** - * locations array for web - */ - private String locations; - - /** - * connects array for web - */ - private String connects; - - - /** - * warningGroupId - */ - @TableField(exist = false) - private int warningGroupId; - - public String getGlobalParams() { - return globalParams; - } - - public void setGlobalParams(String globalParams) { - this.globalParams = globalParams; - } - - public int getId() { - return id; - } - - public void setId(int id) { - this.id = id; - } - - public int getProcessDefinitionId() { - return processDefinitionId; - } - - public void setProcessDefinitionId(int processDefinitionId) { - this.processDefinitionId = processDefinitionId; - } - - public long getVersion() { - return version; - } - - public void setVersion(long version) { - this.version = version; - } - - public String getProcessDefinitionJson() { - return processDefinitionJson; - } - - public void setProcessDefinitionJson(String processDefinitionJson) { - this.processDefinitionJson = processDefinitionJson; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - public Date getCreateTime() { - return createTime; - } - - public void setCreateTime(Date createTime) { - this.createTime = createTime; - } - - public String getLocations() { - return locations; - } - - public void setLocations(String locations) { - this.locations = locations; - } - - public String getConnects() { - return connects; - } - - public void setConnects(String connects) { - this.connects = connects; - } - - public int getTimeout() { - return timeout; - } - - public void setTimeout(int timeout) { - this.timeout = timeout; - } - - public String getResourceIds() { - return resourceIds; - } - - public void setResourceIds(String resourceIds) { - this.resourceIds = resourceIds; - } - - public int getWarningGroupId() { - return warningGroupId; - } - - public void setWarningGroupId(int warningGroupId) { - this.warningGroupId = warningGroupId; - } - - @Override - public String toString() { - return "ProcessDefinitionVersion{" - + "id=" + id - + ", processDefinitionId=" + processDefinitionId - + ", version=" + version - + ", processDefinitionJson='" + processDefinitionJson + '\'' - + ", description='" + description + '\'' - + ", globalParams='" + globalParams + '\'' - + ", createTime=" + createTime - + ", locations='" + locations + '\'' - + ", connects='" + connects + '\'' - + ", timeout=" + timeout - + ", warningGroupId=" + warningGroupId - + ", resourceIds='" + resourceIds + '\'' - + '}'; - } - - public static Builder newBuilder() { - return new Builder(); - } - - public static final class Builder { - private int id; - private int processDefinitionId; - private long version; - private String processDefinitionJson; - private String description; - private String globalParams; - private Date createTime; - private String locations; - private String connects; - private int timeout; - private int warningGroupId; - private String resourceIds; - - private Builder() { - } - - public Builder id(int id) { - this.id = id; - return this; - } - - public Builder processDefinitionId(int processDefinitionId) { - this.processDefinitionId = processDefinitionId; - return this; - } - - public Builder version(long version) { - this.version = version; - return this; - } - - public Builder processDefinitionJson(String processDefinitionJson) { - this.processDefinitionJson = processDefinitionJson; - return this; - } - - public Builder description(String description) { - this.description = description; - return this; - } - - public Builder globalParams(String globalParams) { - this.globalParams = globalParams; - return this; - } - - public Builder createTime(Date createTime) { - this.createTime = createTime; - return this; - } - - public Builder locations(String locations) { - this.locations = locations; - return this; - } - - public Builder connects(String connects) { - this.connects = connects; - return this; - } - - public Builder timeout(int timeout) { - this.timeout = timeout; - return this; - } - - public Builder warningGroupId(int warningGroupId) { - this.warningGroupId = warningGroupId; - return this; - } - - public Builder resourceIds(String resourceIds) { - this.resourceIds = resourceIds; - return this; - } - - public ProcessDefinitionVersion build() { - ProcessDefinitionVersion processDefinitionVersion = new ProcessDefinitionVersion(); - processDefinitionVersion.setId(id); - processDefinitionVersion.setProcessDefinitionId(processDefinitionId); - processDefinitionVersion.setVersion(version); - processDefinitionVersion.setProcessDefinitionJson(processDefinitionJson); - processDefinitionVersion.setDescription(description); - processDefinitionVersion.setGlobalParams(globalParams); - processDefinitionVersion.setCreateTime(createTime); - processDefinitionVersion.setLocations(locations); - processDefinitionVersion.setConnects(connects); - processDefinitionVersion.setTimeout(timeout); - processDefinitionVersion.setWarningGroupId(warningGroupId); - processDefinitionVersion.setResourceIds(resourceIds); - return processDefinitionVersion; - } - } -} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java index 768f75c82a..8b6d762413 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java @@ -18,14 +18,15 @@ package org.apache.dolphinscheduler.dao.upgrade; import org.apache.dolphinscheduler.common.utils.ConnectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.*; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ProcessDefinitionDao { @@ -34,12 +35,13 @@ public class ProcessDefinitionDao { /** * queryAllProcessDefinition + * * @param conn jdbc connection * @return ProcessDefinition Json List */ - public Map queryAllProcessDefinition(Connection conn){ + public Map queryAllProcessDefinition(Connection conn) { - Map processDefinitionJsonMap = new HashMap<>(); + Map processDefinitionJsonMap = new HashMap<>(); String sql = String.format("SELECT id,process_definition_json FROM t_ds_process_definition"); ResultSet rs = null; @@ -48,14 +50,14 @@ public class ProcessDefinitionDao { pstmt = conn.prepareStatement(sql); rs = pstmt.executeQuery(); - while (rs.next()){ + while (rs.next()) { Integer id = rs.getInt(1); String processDefinitionJson = rs.getString(2); - processDefinitionJsonMap.put(id,processDefinitionJson); + processDefinitionJsonMap.put(id, processDefinitionJson); } } catch (Exception e) { - logger.error(e.getMessage(),e); + logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); } finally { ConnectionUtils.releaseResource(rs, pstmt, conn); @@ -67,23 +69,24 @@ public class ProcessDefinitionDao { /** * updateProcessDefinitionJson + * * @param conn jdbc connection * @param processDefinitionJsonMap processDefinitionJsonMap */ - public void updateProcessDefinitionJson(Connection conn,Map processDefinitionJsonMap){ + public void updateProcessDefinitionJson(Connection conn, Map processDefinitionJsonMap) { String sql = "UPDATE t_ds_process_definition SET process_definition_json=? where id=?"; try { - for (Map.Entry entry : processDefinitionJsonMap.entrySet()){ - try(PreparedStatement pstmt= conn.prepareStatement(sql)) { - pstmt.setString(1,entry.getValue()); - pstmt.setInt(2,entry.getKey()); + for (Map.Entry entry : processDefinitionJsonMap.entrySet()) { + try (PreparedStatement pstmt = conn.prepareStatement(sql)) { + pstmt.setString(1, entry.getValue()); + pstmt.setInt(2, entry.getKey()); pstmt.executeUpdate(); } } } catch (Exception e) { - logger.error(e.getMessage(),e); + logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); } finally { ConnectionUtils.releaseResource(conn); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java index 909d71354c..28e9dc2ba4 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java @@ -16,8 +16,19 @@ */ package org.apache.dolphinscheduler.dao.mapper; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessLineage; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; -import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; + +import java.util.Date; +import java.util.HashSet; +import java.util.List; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -27,21 +38,124 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - @RunWith(SpringRunner.class) @SpringBootTest @Transactional @Rollback(true) public class WorkFlowLineageMapperTest { + @Autowired private WorkFlowLineageMapper workFlowLineageMapper; + @Autowired + private ProcessDefinitionMapper processDefinitionMapper; + + @Autowired + private ScheduleMapper scheduleMapper; + + @Autowired + ProcessTaskRelationMapper processTaskRelationMapper; + + /** + * insert + * + * @return ProcessDefinition + */ + private ProcessTaskRelation insertOneProcessTaskRelation() { + //insertOne + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setName("def 1"); + + processTaskRelation.setProjectCode(1L); + processTaskRelation.setProcessDefinitionCode(1L); + processTaskRelation.setPostTaskCode(3L); + processTaskRelation.setPostTaskVersion(1); + processTaskRelation.setPreTaskCode(2L); + processTaskRelation.setPreTaskVersion(1); + processTaskRelation.setUpdateTime(new Date()); + processTaskRelation.setCreateTime(new Date()); + processTaskRelationMapper.insert(processTaskRelation); + return processTaskRelation; + } + + /** + * insert + * + * @return ProcessDefinition + */ + private ProcessDefinition insertOneProcessDefinition() { + //insertOne + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setCode(1L); + processDefinition.setName("def 1"); + processDefinition.setProjectCode(1L); + processDefinition.setUserId(101); + processDefinition.setUpdateTime(new Date()); + processDefinition.setCreateTime(new Date()); + processDefinitionMapper.insert(processDefinition); + return processDefinition; + } + + /** + * insert + * + * @return Schedule + */ + private Schedule insertOneSchedule(int id) { + //insertOne + Schedule schedule = new Schedule(); + schedule.setStartTime(new Date()); + schedule.setEndTime(new Date()); + schedule.setCrontab(""); + schedule.setFailureStrategy(FailureStrategy.CONTINUE); + schedule.setReleaseState(ReleaseState.OFFLINE); + schedule.setWarningType(WarningType.NONE); + schedule.setCreateTime(new Date()); + schedule.setUpdateTime(new Date()); + schedule.setProcessDefinitionId(id); + scheduleMapper.insert(schedule); + return schedule; + } + @Test public void testQueryByName() { - List workFlowLineages = workFlowLineageMapper.queryByName("test",1L); + insertOneProcessDefinition(); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(1L); + insertOneSchedule(processDefinition.getId()); + + List workFlowLineages = workFlowLineageMapper.queryByName(processDefinition.getName(), processDefinition.getProjectCode()); + Assert.assertNotEquals(workFlowLineages.size(), 0); + } + + @Test + public void testQueryCodeRelation() { + ProcessTaskRelation processTaskRelation = insertOneProcessTaskRelation(); + + List workFlowLineages = workFlowLineageMapper.queryCodeRelation(processTaskRelation.getPreTaskCode() + , processTaskRelation.getPreTaskVersion(), 11L, 1L); + Assert.assertNotEquals(workFlowLineages.size(), 0); + } + + @Test + public void testQueryRelationByIds() { + insertOneProcessDefinition(); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(1L); + insertOneProcessTaskRelation(); + + HashSet set = new HashSet<>(); + set.add(processDefinition.getId()); + List workFlowLineages = workFlowLineageMapper.queryRelationByIds(set, processDefinition.getProjectCode()); Assert.assertNotEquals(workFlowLineages.size(), 0); } + + @Test + public void testQueryWorkFlowLineageByCode() { + insertOneProcessDefinition(); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(1L); + insertOneSchedule(processDefinition.getId()); + + WorkFlowLineage workFlowLineages = workFlowLineageMapper.queryWorkFlowLineageByCode(processDefinition.getCode(), processDefinition.getProjectCode()); + Assert.assertNotNull(workFlowLineages); + } + } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WokrerGrouopDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDaoTest.java similarity index 84% rename from dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WokrerGrouopDaoTest.java rename to dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDaoTest.java index 2c9b80a89d..7eed867248 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WokrerGrouopDaoTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDaoTest.java @@ -16,32 +16,31 @@ */ package org.apache.dolphinscheduler.dao.upgrade; -import org.junit.Test; - -import javax.sql.DataSource; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; - import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource; -import static org.hamcrest.Matchers.greaterThan; + import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertThat; -public class WokrerGrouopDaoTest { - protected final DataSource dataSource = getDataSource(); +import java.util.Map; + +import javax.sql.DataSource; + +import org.junit.Test; + +public class WorkerGroupDaoTest { + protected final DataSource dataSource = getDataSource(); @Test - public void testQueryQueryAllOldWorkerGroup() throws Exception{ + public void testQueryQueryAllOldWorkerGroup() throws Exception { WorkerGroupDao workerGroupDao = new WorkerGroupDao(); Map workerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection()); - assertThat(workerGroupMap.size(),greaterThanOrEqualTo(0)); + assertThat(workerGroupMap.size(), greaterThanOrEqualTo(0)); } @Test(expected = Exception.class) - public void testQueryQueryAllOldWorkerGroupException() throws Exception{ + public void testQueryQueryAllOldWorkerGroupException() throws Exception { WorkerGroupDao workerGroupDao = new WorkerGroupDao(); workerGroupDao.queryAllOldWorkerGroup(null);