Zhou Zheng
4 years ago
137 changed files with 3020 additions and 369 deletions
@ -0,0 +1,81 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.api.controller; |
||||
|
||||
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; |
||||
import org.apache.dolphinscheduler.api.utils.Result; |
||||
import org.apache.dolphinscheduler.common.utils.ParameterUtils; |
||||
import io.swagger.annotations.ApiParam; |
||||
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.http.HttpStatus; |
||||
import org.springframework.web.bind.annotation.*; |
||||
import springfox.documentation.annotations.ApiIgnore; |
||||
|
||||
import java.util.HashSet; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Set; |
||||
|
||||
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKFLOW_LINEAGE_ERROR; |
||||
|
||||
@RestController |
||||
@RequestMapping("lineages/{projectId}") |
||||
public class WorkFlowLineageController extends BaseController { |
||||
private static final Logger logger = LoggerFactory.getLogger(WorkFlowLineageController.class); |
||||
|
||||
@Autowired |
||||
private WorkFlowLineageService workFlowLineageService; |
||||
|
||||
@GetMapping(value="/list-name") |
||||
@ResponseStatus(HttpStatus.OK) |
||||
public Result<List<WorkFlowLineage>> queryWorkFlowLineageByName(@ApiIgnore @RequestParam(value = "searchVal", required = false) String searchVal, @ApiParam(name = "projectId", value = "PROJECT_ID", required = true) @PathVariable int projectId) { |
||||
try { |
||||
searchVal = ParameterUtils.handleEscapes(searchVal); |
||||
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByName(searchVal,projectId); |
||||
return returnDataList(result); |
||||
} catch (Exception e){ |
||||
logger.error(QUERY_WORKFLOW_LINEAGE_ERROR.getMsg(),e); |
||||
return error(QUERY_WORKFLOW_LINEAGE_ERROR.getCode(), QUERY_WORKFLOW_LINEAGE_ERROR.getMsg()); |
||||
} |
||||
} |
||||
|
||||
@GetMapping(value="/list-ids") |
||||
@ResponseStatus(HttpStatus.OK) |
||||
public Result<Map<String, Object>> queryWorkFlowLineageByIds(@ApiIgnore @RequestParam(value = "ids", required = false) String ids,@ApiParam(name = "projectId", value = "PROJECT_ID", required = true) @PathVariable int projectId) { |
||||
|
||||
try { |
||||
ids = ParameterUtils.handleEscapes(ids); |
||||
Set<Integer> idsSet = new HashSet<>(); |
||||
if(ids != null) { |
||||
String[] idsStr = ids.split(","); |
||||
for (String id : idsStr) |
||||
{ |
||||
idsSet.add(Integer.parseInt(id)); |
||||
} |
||||
} |
||||
|
||||
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByIds(idsSet, projectId); |
||||
return returnDataList(result); |
||||
} catch (Exception e){ |
||||
logger.error(QUERY_WORKFLOW_LINEAGE_ERROR.getMsg(),e); |
||||
return error(QUERY_WORKFLOW_LINEAGE_ERROR.getCode(), QUERY_WORKFLOW_LINEAGE_ERROR.getMsg()); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,97 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.api.service; |
||||
|
||||
import org.apache.dolphinscheduler.api.enums.Status; |
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; |
||||
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; |
||||
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
import java.util.*; |
||||
|
||||
@Service |
||||
public class WorkFlowLineageService extends BaseService { |
||||
|
||||
@Autowired |
||||
private WorkFlowLineageMapper workFlowLineageMapper; |
||||
|
||||
public Map<String, Object> queryWorkFlowLineageByName(String workFlowName, int projectId) { |
||||
Map<String, Object> result = new HashMap<>(5); |
||||
List<WorkFlowLineage> workFlowLineageList = workFlowLineageMapper.queryByName(workFlowName, projectId); |
||||
result.put(Constants.DATA_LIST, workFlowLineageList); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
|
||||
private List<WorkFlowRelation> getWorkFlowRelationRecursion(Set<Integer> ids, List<WorkFlowRelation> workFlowRelations,Set<Integer> sourceIds) { |
||||
for(int id : ids) { |
||||
sourceIds.addAll(ids); |
||||
List<WorkFlowRelation> workFlowRelationsTmp = workFlowLineageMapper.querySourceTarget(id); |
||||
if(workFlowRelationsTmp != null && !workFlowRelationsTmp.isEmpty()) { |
||||
Set<Integer> idsTmp = new HashSet<>(); |
||||
for(WorkFlowRelation workFlowRelation:workFlowRelationsTmp) { |
||||
if(!sourceIds.contains(workFlowRelation.getTargetWorkFlowId())){ |
||||
idsTmp.add(workFlowRelation.getTargetWorkFlowId()); |
||||
} |
||||
} |
||||
workFlowRelations.addAll(workFlowRelationsTmp); |
||||
getWorkFlowRelationRecursion(idsTmp, workFlowRelations,sourceIds); |
||||
} |
||||
} |
||||
return workFlowRelations; |
||||
} |
||||
|
||||
public Map<String, Object> queryWorkFlowLineageByIds(Set<Integer> ids,int projectId) { |
||||
Map<String, Object> result = new HashMap<>(5); |
||||
List<WorkFlowLineage> workFlowLineageList = workFlowLineageMapper.queryByIds(ids, projectId); |
||||
Map<String, Object> workFlowLists = new HashMap<>(5); |
||||
Set<Integer> idsV = new HashSet<>(); |
||||
if(ids == null || ids.isEmpty()){ |
||||
for(WorkFlowLineage workFlowLineage:workFlowLineageList) { |
||||
idsV.add(workFlowLineage.getWorkFlowId()); |
||||
} |
||||
} else { |
||||
idsV = ids; |
||||
} |
||||
List<WorkFlowRelation> workFlowRelations = new ArrayList<>(); |
||||
Set<Integer> sourceIds = new HashSet<>(); |
||||
getWorkFlowRelationRecursion(idsV, workFlowRelations, sourceIds); |
||||
|
||||
Set<Integer> idSet = new HashSet<>(); |
||||
//If the incoming parameter is not empty, you need to add downstream workflow detail attributes
|
||||
if(ids != null && !ids.isEmpty()) { |
||||
for(WorkFlowRelation workFlowRelation : workFlowRelations) { |
||||
idSet.add(workFlowRelation.getTargetWorkFlowId()); |
||||
} |
||||
for(int id : ids){ |
||||
idSet.remove(id); |
||||
} |
||||
if(!idSet.isEmpty()) { |
||||
workFlowLineageList.addAll(workFlowLineageMapper.queryByIds(idSet, projectId)); |
||||
} |
||||
} |
||||
|
||||
workFlowLists.put("workFlowList",workFlowLineageList); |
||||
workFlowLists.put("workFlowRelationList",workFlowRelations); |
||||
result.put(Constants.DATA_LIST, workFlowLists); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
} |
@ -0,0 +1,69 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.api.controller; |
||||
|
||||
import org.apache.dolphinscheduler.api.enums.Status; |
||||
import org.apache.dolphinscheduler.api.utils.Result; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.http.MediaType; |
||||
import org.springframework.test.web.servlet.MvcResult; |
||||
import org.springframework.util.LinkedMultiValueMap; |
||||
import org.springframework.util.MultiValueMap; |
||||
|
||||
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; |
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; |
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
||||
|
||||
public class WorkFlowLineageControllerTest extends AbstractControllerTest { |
||||
private static Logger logger = LoggerFactory.getLogger(WorkFlowLineageControllerTest.class); |
||||
|
||||
@Test |
||||
public void testQueryWorkFlowLineageByName() throws Exception { |
||||
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>(); |
||||
paramsMap.add("searchVal","test"); |
||||
MvcResult mvcResult = mockMvc.perform(get("/lineages/1/list-name") |
||||
.header("sessionId", sessionId) |
||||
.params(paramsMap)) |
||||
.andExpect(status().isOk()) |
||||
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) |
||||
.andReturn(); |
||||
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); |
||||
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); |
||||
logger.info(mvcResult.getResponse().getContentAsString()); |
||||
|
||||
} |
||||
|
||||
@Test |
||||
public void testQueryWorkFlowLineageByIds() throws Exception { |
||||
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>(); |
||||
paramsMap.add("ids","1"); |
||||
MvcResult mvcResult = mockMvc.perform(get("/lineages/1/list-ids") |
||||
.header("sessionId", sessionId) |
||||
.params(paramsMap)) |
||||
.andExpect(status().isOk()) |
||||
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) |
||||
.andReturn(); |
||||
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); |
||||
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); |
||||
logger.info(mvcResult.getResponse().getContentAsString()); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,88 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.api.service; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.utils.EncryptionUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; |
||||
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; |
||||
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; |
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.InjectMocks; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
|
||||
import java.util.*; |
||||
|
||||
import static org.mockito.Mockito.when; |
||||
|
||||
@RunWith(MockitoJUnitRunner.class) |
||||
public class WorkFlowLineageServiceTest { |
||||
|
||||
@InjectMocks |
||||
private WorkFlowLineageService workFlowLineageService; |
||||
|
||||
@Mock |
||||
private WorkFlowLineageMapper workFlowLineageMapper; |
||||
|
||||
@Test |
||||
public void testQueryWorkFlowLineageByName() { |
||||
String searchVal = "test"; |
||||
when(workFlowLineageMapper.queryByName(searchVal, 1)).thenReturn(getWorkFlowLineages()); |
||||
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByName(searchVal,1); |
||||
List<WorkFlowLineage> workFlowLineageList = (List<WorkFlowLineage>)result.get(Constants.DATA_LIST); |
||||
Assert.assertTrue(workFlowLineageList.size()>0); |
||||
} |
||||
|
||||
@Test |
||||
public void testQueryWorkFlowLineageByIds() { |
||||
|
||||
Set<Integer> ids = new HashSet<>(); |
||||
ids.add(1); |
||||
ids.add(2); |
||||
|
||||
when(workFlowLineageMapper.queryByIds(ids, 1)).thenReturn(getWorkFlowLineages()); |
||||
when(workFlowLineageMapper.querySourceTarget(1)).thenReturn(getWorkFlowRelation()); |
||||
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByIds(ids,1); |
||||
Map<String, Object> workFlowLists = (Map<String, Object>)result.get(Constants.DATA_LIST); |
||||
List<WorkFlowLineage> workFlowLineages = (List<WorkFlowLineage>)workFlowLists.get("workFlowList"); |
||||
List<WorkFlowRelation> workFlowRelations = (List<WorkFlowRelation>)workFlowLists.get("workFlowRelationList"); |
||||
Assert.assertTrue(workFlowLineages.size()>0); |
||||
Assert.assertTrue(workFlowRelations.size()>0); |
||||
} |
||||
|
||||
private List<WorkFlowLineage> getWorkFlowLineages() { |
||||
List<WorkFlowLineage> workFlowLineages = new ArrayList<>(); |
||||
WorkFlowLineage workFlowLineage = new WorkFlowLineage(); |
||||
workFlowLineage.setWorkFlowId(1); |
||||
workFlowLineage.setWorkFlowName("testdag"); |
||||
workFlowLineages.add(workFlowLineage); |
||||
return workFlowLineages; |
||||
} |
||||
|
||||
private List<WorkFlowRelation> getWorkFlowRelation(){ |
||||
List<WorkFlowRelation> workFlowRelations = new ArrayList<>(); |
||||
WorkFlowRelation workFlowRelation = new WorkFlowRelation(); |
||||
workFlowRelation.setSourceWorkFlowId(1); |
||||
workFlowRelation.setTargetWorkFlowId(2); |
||||
workFlowRelations.add(workFlowRelation); |
||||
return workFlowRelations; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,94 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.dao.entity; |
||||
|
||||
import java.util.Date; |
||||
|
||||
public class WorkFlowLineage { |
||||
private int workFlowId; |
||||
private String workFlowName; |
||||
private String workFlowPublishStatus; |
||||
private Date scheduleStartTime; |
||||
private Date scheduleEndTime; |
||||
private String crontab; |
||||
private int schedulePublishStatus; |
||||
private String sourceWorkFlowId; |
||||
|
||||
public String getSourceWorkFlowId() { |
||||
return sourceWorkFlowId; |
||||
} |
||||
|
||||
public void setSourceWorkFlowId(String sourceWorkFlowId) { |
||||
this.sourceWorkFlowId = sourceWorkFlowId; |
||||
} |
||||
|
||||
public int getWorkFlowId() { |
||||
return workFlowId; |
||||
} |
||||
|
||||
public void setWorkFlowId(int workFlowId) { |
||||
this.workFlowId = workFlowId; |
||||
} |
||||
|
||||
public String getWorkFlowName() { |
||||
return workFlowName; |
||||
} |
||||
|
||||
public void setWorkFlowName(String workFlowName) { |
||||
this.workFlowName = workFlowName; |
||||
} |
||||
|
||||
public String getWorkFlowPublishStatus() { |
||||
return workFlowPublishStatus; |
||||
} |
||||
|
||||
public void setWorkFlowPublishStatus(String workFlowPublishStatus) { |
||||
this.workFlowPublishStatus = workFlowPublishStatus; |
||||
} |
||||
|
||||
public Date getScheduleStartTime() { |
||||
return scheduleStartTime; |
||||
} |
||||
|
||||
public void setScheduleStartTime(Date scheduleStartTime) { |
||||
this.scheduleStartTime = scheduleStartTime; |
||||
} |
||||
|
||||
public Date getScheduleEndTime() { |
||||
return scheduleEndTime; |
||||
} |
||||
|
||||
public void setScheduleEndTime(Date scheduleEndTime) { |
||||
this.scheduleEndTime = scheduleEndTime; |
||||
} |
||||
|
||||
public String getCrontab() { |
||||
return crontab; |
||||
} |
||||
|
||||
public void setCrontab(String crontab) { |
||||
this.crontab = crontab; |
||||
} |
||||
|
||||
public int getSchedulePublishStatus() { |
||||
return schedulePublishStatus; |
||||
} |
||||
|
||||
public void setSchedulePublishStatus(int schedulePublishStatus) { |
||||
this.schedulePublishStatus = schedulePublishStatus; |
||||
} |
||||
} |
@ -0,0 +1,38 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.dao.entity; |
||||
|
||||
public class WorkFlowRelation { |
||||
private int sourceWorkFlowId; |
||||
private int targetWorkFlowId; |
||||
|
||||
public int getSourceWorkFlowId() { |
||||
return sourceWorkFlowId; |
||||
} |
||||
|
||||
public void setSourceWorkFlowId(int sourceWorkFlowId) { |
||||
this.sourceWorkFlowId = sourceWorkFlowId; |
||||
} |
||||
|
||||
public int getTargetWorkFlowId() { |
||||
return targetWorkFlowId; |
||||
} |
||||
|
||||
public void setTargetWorkFlowId(int targetWorkFlowId) { |
||||
this.targetWorkFlowId = targetWorkFlowId; |
||||
} |
||||
} |
@ -0,0 +1,32 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.dao.mapper; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; |
||||
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; |
||||
import org.apache.ibatis.annotations.Param; |
||||
import java.util.List; |
||||
import java.util.Set; |
||||
|
||||
public interface WorkFlowLineageMapper { |
||||
|
||||
public List<WorkFlowLineage> queryByName(@Param("searchVal") String searchVal, @Param("projectId") int projectId); |
||||
|
||||
public List<WorkFlowLineage> queryByIds(@Param("ids") Set<Integer> ids, @Param("projectId") int projectId); |
||||
|
||||
public List<WorkFlowRelation> querySourceTarget(@Param("id") int id); |
||||
} |
@ -0,0 +1,103 @@
|
||||
<?xml version="1.0" encoding="UTF-8" ?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > |
||||
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper"> |
||||
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage"> |
||||
select tepd.id as work_flow_id,tepd.name as work_flow_name |
||||
from t_ds_process_definition tepd |
||||
left join t_ds_schedules tes on tepd.id = tes.process_definition_id |
||||
where tepd.project_id = #{projectId} |
||||
<if test="searchVal != null and searchVal != ''"> |
||||
and tepd.name like concat('%', #{searchVal}, '%') |
||||
</if> |
||||
</select> |
||||
<select id="queryByIds" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage" databaseId="mysql"> |
||||
select tepd.id as work_flow_id,tepd.name as work_flow_name, |
||||
(case when json_extract(tepd.process_definition_json, '$**.dependItemList') is not null then 1 else 0 end) as is_depend_work_flow, |
||||
json_extract(tepd.process_definition_json, '$**.definitionId') as source_work_flow_id, |
||||
tepd.release_state as work_flow_publish_status, |
||||
tes.start_time as schedule_start_time, |
||||
tes.end_time as schedule_end_time, |
||||
tes.crontab as crontab, |
||||
tes.release_state as schedule_publish_status |
||||
from t_ds_process_definition tepd |
||||
left join t_ds_schedules tes on tepd.id = tes.process_definition_id |
||||
where tepd.project_id = #{projectId} |
||||
<if test="ids != null and ids.size()>0"> |
||||
and tepd.id in |
||||
<foreach collection="ids" index="index" item="i" open="(" separator="," close=")"> |
||||
#{i} |
||||
</foreach> |
||||
</if> |
||||
</select> |
||||
|
||||
<select id="queryByIds" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage" databaseId="pg"> |
||||
select a.work_flow_id, |
||||
a.work_flow_name, |
||||
a.is_depend_work_flow, |
||||
array_agg(a.source_id) as source_id, |
||||
a.work_flow_publish_status, |
||||
a.schedule_start_time, |
||||
a.schedule_end_time, |
||||
a.crontab, |
||||
a.schedule_publish_status |
||||
from ( |
||||
select tepd.id as work_flow_id,tepd.name as work_flow_name, |
||||
case when tepd.process_definition_json::json#>'{tasks,1,dependence}' is not null then 1 else 0 end as is_depend_work_flow, |
||||
(json_array_elements(tepd.process_definition_json::json#>'{tasks}')#>>'{dependence,dependTaskList,0,dependItemList,0,definitionId}') as source_id, |
||||
tepd.release_state as work_flow_publish_status, |
||||
tes.start_time as schedule_start_time, |
||||
tes.end_time as schedule_end_time, |
||||
tes.crontab as crontab, |
||||
tes.release_state as schedule_publish_status |
||||
from t_ds_process_definition tepd |
||||
left join t_ds_schedules tes on tepd.id = tes.process_definition_id |
||||
where tepd.project_id = #{projectId} |
||||
<if test="ids != null and ids.size()>0"> |
||||
and tepd.id in |
||||
<foreach collection="ids" index="index" item="i" open="(" separator="," close=")"> |
||||
#{i} |
||||
</foreach> |
||||
</if> |
||||
) a |
||||
where (a.is_depend_work_flow = 1 and source_id is not null) or (a.is_depend_work_flow = 0) |
||||
group by a.work_flow_id,a.work_flow_name,a.is_depend_work_flow,a.work_flow_publish_status,a.schedule_start_time, |
||||
a.schedule_end_time,a.crontab,a.schedule_publish_status |
||||
</select> |
||||
|
||||
<select id="querySourceTarget" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelation" databaseId="mysql"> |
||||
select id as target_work_flow_id,#{id} as source_work_flow_id |
||||
from t_ds_process_definition t |
||||
where json_extract(t.process_definition_json, '$**.dependItemList') is not null |
||||
and find_in_set(#{id}, replace(replace(replace(json_extract(t.process_definition_json, '$**.definitionId'), '[', ''),']', ''), ' ', '')) > 0 |
||||
</select> |
||||
|
||||
<select id="querySourceTarget" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelation" databaseId="pg"> |
||||
select a.work_flow_id as target_work_flow_id, |
||||
a.source_id as source_work_flow_id |
||||
from ( |
||||
select tepd.id as work_flow_id, |
||||
(json_array_elements(tepd.process_definition_json::json#>'{tasks}')#>>'{dependence,dependTaskList,0,dependItemList,0,definitionId}') as source_id |
||||
from t_ds_process_definition tepd |
||||
left join t_ds_schedules tes on tepd.id = tes.process_definition_id |
||||
where tepd.project_id = 1) a |
||||
where source_id = #{id}::text; |
||||
</select> |
||||
|
||||
</mapper> |
@ -0,0 +1,62 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.dao.mapper; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; |
||||
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; |
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.boot.test.context.SpringBootTest; |
||||
import org.springframework.test.annotation.Rollback; |
||||
import org.springframework.test.context.junit4.SpringRunner; |
||||
import org.springframework.transaction.annotation.Transactional; |
||||
|
||||
import java.util.HashSet; |
||||
import java.util.List; |
||||
import java.util.Set; |
||||
|
||||
@RunWith(SpringRunner.class) |
||||
@SpringBootTest |
||||
@Transactional |
||||
@Rollback(true) |
||||
public class WorkFlowLineageMapperTest { |
||||
@Autowired |
||||
private WorkFlowLineageMapper workFlowLineageMapper; |
||||
|
||||
@Test |
||||
public void testQueryByName() { |
||||
List<WorkFlowLineage> workFlowLineages = workFlowLineageMapper.queryByName("test",1); |
||||
Assert.assertNotEquals(workFlowLineages.size(), 0); |
||||
} |
||||
|
||||
|
||||
@Test |
||||
public void testQueryByIds() { |
||||
Set<Integer> ids = new HashSet<>(); |
||||
ids.add(1); |
||||
List<WorkFlowLineage> workFlowLineages = workFlowLineageMapper.queryByIds(ids,1); |
||||
Assert.assertNotEquals(workFlowLineages.size(), 0); |
||||
} |
||||
|
||||
@Test |
||||
public void testQuerySourceTarget() { |
||||
List<WorkFlowRelation> workFlowRelations = workFlowLineageMapper.querySourceTarget(1); |
||||
Assert.assertNotEquals(workFlowRelations.size(), 0); |
||||
} |
||||
} |
@ -0,0 +1,119 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.service.zk; |
||||
|
||||
import org.apache.commons.lang.StringUtils; |
||||
import org.apache.curator.framework.CuratorFramework; |
||||
import org.apache.curator.framework.CuratorFrameworkFactory; |
||||
import org.apache.curator.framework.api.ACLProvider; |
||||
import org.apache.curator.framework.state.ConnectionState; |
||||
import org.apache.curator.retry.ExponentialBackoffRetry; |
||||
import org.apache.zookeeper.ZooDefs; |
||||
import org.apache.zookeeper.data.ACL; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.InitializingBean; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import java.nio.charset.StandardCharsets; |
||||
import java.util.List; |
||||
|
||||
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; |
||||
|
||||
/** |
||||
* Shared Curator zookeeper client |
||||
*/ |
||||
@Component |
||||
public class CuratorZookeeperClient implements InitializingBean { |
||||
private final Logger logger = LoggerFactory.getLogger(CuratorZookeeperClient.class); |
||||
|
||||
@Autowired |
||||
private ZookeeperConfig zookeeperConfig; |
||||
|
||||
private CuratorFramework zkClient; |
||||
|
||||
|
||||
@Override |
||||
public void afterPropertiesSet() throws Exception { |
||||
this.zkClient = buildClient(); |
||||
initStateLister(); |
||||
} |
||||
|
||||
private CuratorFramework buildClient() { |
||||
logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList()); |
||||
|
||||
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null"))) |
||||
.retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs())); |
||||
|
||||
//these has default value
|
||||
if (0 != zookeeperConfig.getSessionTimeoutMs()) { |
||||
builder.sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs()); |
||||
} |
||||
if (0 != zookeeperConfig.getConnectionTimeoutMs()) { |
||||
builder.connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs()); |
||||
} |
||||
if (StringUtils.isNotBlank(zookeeperConfig.getDigest())) { |
||||
builder.authorization("digest", zookeeperConfig.getDigest().getBytes(StandardCharsets.UTF_8)).aclProvider(new ACLProvider() { |
||||
|
||||
@Override |
||||
public List<ACL> getDefaultAcl() { |
||||
return ZooDefs.Ids.CREATOR_ALL_ACL; |
||||
} |
||||
|
||||
@Override |
||||
public List<ACL> getAclForPath(final String path) { |
||||
return ZooDefs.Ids.CREATOR_ALL_ACL; |
||||
} |
||||
}); |
||||
} |
||||
zkClient = builder.build(); |
||||
zkClient.start(); |
||||
try { |
||||
zkClient.blockUntilConnected(); |
||||
} catch (final Exception ex) { |
||||
throw new RuntimeException(ex); |
||||
} |
||||
return zkClient; |
||||
} |
||||
|
||||
public void initStateLister() { |
||||
checkNotNull(zkClient); |
||||
|
||||
zkClient.getConnectionStateListenable().addListener((client, newState) -> { |
||||
if(newState == ConnectionState.LOST){ |
||||
logger.error("connection lost from zookeeper"); |
||||
} else if(newState == ConnectionState.RECONNECTED){ |
||||
logger.info("reconnected to zookeeper"); |
||||
} else if(newState == ConnectionState.SUSPENDED){ |
||||
logger.warn("connection SUSPENDED to zookeeper"); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
public ZookeeperConfig getZookeeperConfig() { |
||||
return zookeeperConfig; |
||||
} |
||||
|
||||
public void setZookeeperConfig(ZookeeperConfig zookeeperConfig) { |
||||
this.zookeeperConfig = zookeeperConfig; |
||||
} |
||||
|
||||
public CuratorFramework getZkClient() { |
||||
return zkClient; |
||||
} |
||||
} |
@ -0,0 +1,52 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
|
||||
<!-- Logback configuration. See http://logback.qos.ch/manual/index.html --> |
||||
<configuration scan="true" scanPeriod="120 seconds"> <!--debug="true" --> |
||||
|
||||
<property name="log.base" value="logs"/> |
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> |
||||
<encoder> |
||||
<pattern> |
||||
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n |
||||
</pattern> |
||||
<charset>UTF-8</charset> |
||||
</encoder> |
||||
</appender> |
||||
|
||||
<appender name="LOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> |
||||
<file>${log.base}/dolphinscheduler-zookeeper.log</file> |
||||
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> |
||||
<fileNamePattern>${log.base}/dolphinscheduler-alert.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern> |
||||
<maxHistory>20</maxHistory> |
||||
<maxFileSize>64MB</maxFileSize> |
||||
</rollingPolicy> |
||||
<encoder> |
||||
<pattern> |
||||
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n |
||||
</pattern> |
||||
<charset>UTF-8</charset> |
||||
</encoder> |
||||
</appender> |
||||
|
||||
<root level="INFO"> |
||||
<appender-ref ref="STDOUT"/> |
||||
<appender-ref ref="LOGFILE"/> |
||||
</root> |
||||
|
||||
</configuration> |
@ -0,0 +1,67 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.service.zk; |
||||
|
||||
import org.junit.After; |
||||
import org.junit.Assert; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
|
||||
import java.io.IOException; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
public class CuratorZookeeperClientTest { |
||||
private static ZKServer zkServer; |
||||
|
||||
@Before |
||||
public void before() throws IOException { |
||||
new Thread(() -> { |
||||
if (zkServer == null) { |
||||
zkServer = new ZKServer(); |
||||
} |
||||
zkServer.startLocalZkServer(2185); |
||||
}).start(); |
||||
} |
||||
|
||||
@After |
||||
public void after() { |
||||
if (zkServer != null) { |
||||
zkServer.stop(); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void testAfterPropertiesSet() throws Exception { |
||||
TimeUnit.SECONDS.sleep(10); |
||||
CuratorZookeeperClient zookeeperClient = new CuratorZookeeperClient(); |
||||
ZookeeperConfig zookeeperConfig = new ZookeeperConfig(); |
||||
zookeeperConfig.setServerList("127.0.0.1:2185"); |
||||
zookeeperConfig.setBaseSleepTimeMs(100); |
||||
zookeeperConfig.setMaxSleepMs(30000); |
||||
zookeeperConfig.setMaxRetries(10); |
||||
zookeeperConfig.setSessionTimeoutMs(60000); |
||||
zookeeperConfig.setConnectionTimeoutMs(30000); |
||||
zookeeperConfig.setDigest(" "); |
||||
zookeeperConfig.setDsRoot("/dolphinscheduler"); |
||||
zookeeperConfig.setMaxWaitTime(30000); |
||||
zookeeperClient.setZookeeperConfig(zookeeperConfig); |
||||
System.out.println("start"); |
||||
zookeeperClient.afterPropertiesSet(); |
||||
System.out.println("end"); |
||||
Assert.assertNotNull(zookeeperClient.getZkClient()); |
||||
} |
||||
} |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue