Browse Source

[Feature][JsonSplit-api] select-by-code of processDefinition (#5687)

* select-by-code of processDefinition

* fix ut

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinyLeeChina 3 years ago committed by GitHub
parent
commit
5b6c9b7d43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  3. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  4. 66
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
  5. 33
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  6. 71
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DagData.java
  7. 33
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

17
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -376,26 +376,25 @@ public class ProcessDefinitionController extends BaseController {
} }
/** /**
* query datail of process definition by id * query detail of process definition by code
* *
* @param loginUser login user * @param loginUser login user
* @param projectName project name * @param projectName project name
* @param processId process definition id * @param code process definition id
* @return process definition detail * @return process definition detail
*/ */
@ApiOperation(value = "queryProcessDefinitionById", notes = "QUERY_PROCESS_DEFINITION_BY_ID_NOTES") @ApiOperation(value = "queryProcessDefinitionByCode", notes = "QUERY_PROCESS_DEFINITION_BY_ID_NOTES")
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(name = "processId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100") @ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "123456789")
}) })
@GetMapping(value = "/select-by-id") @GetMapping(value = "/select-by-code")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_DATAIL_OF_PROCESS_DEFINITION_ERROR) @ApiException(QUERY_DATAIL_OF_PROCESS_DEFINITION_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser") @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryProcessDefinitionById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result queryProcessDefinitionByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam("processId") Integer processId @RequestParam(value = "code", required = true) long code) {
) { Map<String, Object> result = processDefinitionService.queryProcessDefinitionByCode(loginUser, projectName, code);
Map<String, Object> result = processDefinitionService.queryProcessDefinitionById(loginUser, projectName, processId);
return returnDataList(result); return returnDataList(result);
} }

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -90,17 +90,17 @@ public interface ProcessDefinitionService {
Integer userId); Integer userId);
/** /**
* query datail of process definition * query detail of process definition
* *
* @param loginUser login user * @param loginUser login user
* @param projectName project name * @param projectName project name
* @param processId process definition id * @param code process definition code
* @return process definition detail * @return process definition detail
*/ */
Map<String, Object> queryProcessDefinitionById(User loginUser, Map<String, Object> queryProcessDefinitionByCode(User loginUser,
String projectName, String projectName,
Integer processId); long code);
/** /**
* query datail of process definition * query datail of process definition
@ -112,8 +112,8 @@ public interface ProcessDefinitionService {
*/ */
Map<String, Object> queryProcessDefinitionByName(User loginUser, Map<String, Object> queryProcessDefinitionByName(User loginUser,
String projectName, String projectName,
String processDefinitionName); String processDefinitionName);
/** /**
* batch copy process definition * batch copy process definition

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException; import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.StreamUtils; import org.apache.dolphinscheduler.common.utils.StreamUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
@ -353,15 +354,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
/** /**
* query datail of process definition * query detail of process definition
* *
* @param loginUser login user * @param loginUser login user
* @param projectName project name * @param projectName project name
* @param processId process definition id * @param code process definition code
* @return process definition detail * @return process definition detail
*/ */
@Override @Override
public Map<String, Object> queryProcessDefinitionById(User loginUser, String projectName, Integer processId) { public Map<String, Object> queryProcessDefinitionByCode(User loginUser, String projectName, long code) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName); Project project = projectMapper.queryByName(projectName);
@ -372,14 +373,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return checkResult; return checkResult;
} }
ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId); ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) { if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
} else { } else {
ProcessData processData = processService.genProcessData(processDefinition); DagData dagData = processService.genDagData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData)); result.put(Constants.DATA_LIST, dagData);
result.put(Constants.DATA_LIST, processDefinition);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} }
return result; return result;

66
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java

@ -73,27 +73,28 @@ public class ProcessDefinitionControllerTest {
@Test @Test
public void testCreateProcessDefinition() throws Exception { public void testCreateProcessDefinition() throws Exception {
String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\"" String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+ ":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\" + "\"condition_type\":0,\"condition_params\":{}},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+ "necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"" + "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":{}}]";
+ ",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}";
String projectName = "test"; String projectName = "test";
String name = "dag_test"; String name = "dag_test";
String description = "desc test"; String description = "desc test";
String globalParams = "[]";
String connects = "[]"; String connects = "[]";
String locations = "[]";
int timeout = 0;
String tenantCode = "root";
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, 1); result.put(Constants.DATA_LIST, 1);
Mockito.when(processDefinitionService.createProcessDefinition(user, projectName, name, json, Mockito.when(processDefinitionService.createProcessDefinition(user, projectName, name, description, globalParams,
description, locations, connects)).thenReturn(result); connects, locations, timeout, tenantCode, json)).thenReturn(result);
Result response = processDefinitionController.createProcessDefinition(user, projectName, name, json, Result response = processDefinitionController.createProcessDefinition(user, projectName, name, description, globalParams,
locations, connects, description); connects, locations, timeout, tenantCode, json);
Assert.assertTrue(response.isSuccess()); Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
} }
private void putMsg(Map<String, Object> result, Status status, Object... statusParams) { private void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
@ -122,28 +123,29 @@ public class ProcessDefinitionControllerTest {
@Test @Test
public void updateProcessDefinition() throws Exception { public void updateProcessDefinition() throws Exception {
String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\"" + "\"condition_type\":0,\"condition_params\":{}},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+ ",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"}" + "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":{}}]";
+ ",\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\""
+ ":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\""
+ ":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"; String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}";
String projectName = "test"; String projectName = "test";
String name = "dag_test"; String name = "dag_test";
String description = "desc test"; String description = "desc test";
String connects = "[]"; String connects = "[]";
int id = 1; String globalParams = "[]";
int timeout = 0;
String tenantCode = "root";
long code = 123L;
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
result.put("processDefinitionId", 1); result.put("processDefinitionId", 1);
Mockito.when(processDefinitionService.updateProcessDefinition(user, projectName, id, name, json, Mockito.when(processDefinitionService.updateProcessDefinition(user, projectName, name, code, description, globalParams,
description, locations, connects)).thenReturn(result); connects, locations, timeout, tenantCode, json)).thenReturn(result);
Result response = processDefinitionController.updateProcessDefinition(user, projectName, name, id, json, Result response = processDefinitionController.updateProcessDefinition(user, projectName, name, code, description, globalParams,
locations, connects, description,ReleaseState.OFFLINE); connects, locations, timeout, tenantCode, json, ReleaseState.OFFLINE);
Assert.assertTrue(response != null && response.isSuccess()); } Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}
@Test @Test
public void testReleaseProcessDefinition() throws Exception { public void testReleaseProcessDefinition() throws Exception {
@ -158,25 +160,19 @@ public class ProcessDefinitionControllerTest {
} }
@Test @Test
public void testQueryProcessDefinitionById() throws Exception { public void testQueryProcessDefinitionByCode() {
String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1"
+ "\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}"
+ "\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\""
+ ":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":"
+ "\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"; String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}";
String projectName = "test"; String projectName = "test";
String name = "dag_test"; String name = "dag_test";
String description = "desc test"; String description = "desc test";
String connects = "[]"; String connects = "[]";
int id = 1; long code = 1L;
ProcessDefinition processDefinition = new ProcessDefinition(); ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setProjectName(projectName); processDefinition.setProjectName(projectName);
processDefinition.setConnects(connects); processDefinition.setConnects(connects);
processDefinition.setDescription(description); processDefinition.setDescription(description);
processDefinition.setId(id); processDefinition.setCode(code);
processDefinition.setLocations(locations); processDefinition.setLocations(locations);
processDefinition.setName(name); processDefinition.setName(name);
@ -184,10 +180,10 @@ public class ProcessDefinitionControllerTest {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition); result.put(Constants.DATA_LIST, processDefinition);
Mockito.when(processDefinitionService.queryProcessDefinitionById(user, projectName, id)).thenReturn(result); Mockito.when(processDefinitionService.queryProcessDefinitionByCode(user, projectName, code)).thenReturn(result);
Result response = processDefinitionController.queryProcessDefinitionById(user, projectName, id); Result response = processDefinitionController.queryProcessDefinitionByCode(user, projectName, code);
Assert.assertTrue(response != null && response.isSuccess()); Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
} }
@Test @Test

33
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@ -317,7 +318,7 @@ public class ProcessDefinitionServiceTest {
} }
@Test @Test
public void testQueryProcessDefinitionById() { public void testQueryProcessDefinitionByCode() {
String projectName = "project_test1"; String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
@ -332,33 +333,24 @@ public class ProcessDefinitionServiceTest {
//project check auth fail //project check auth fail
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Map<String, Object> map = processDefinitionService.queryProcessDefinitionById(loginUser, Map<String, Object> map = processDefinitionService.queryProcessDefinitionByCode(loginUser,
"project_test1", 1); "project_test1", 1L);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS)); Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
//project check auth success, instance not exist //project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectName); putMsg(result, Status.SUCCESS, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processDefineMapper.selectById(1)).thenReturn(null); DagData dagData = new DagData(getProcessDefinition(), null, null);
Mockito.when(processService.genDagData(Mockito.any())).thenReturn(dagData);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionByCode(loginUser,
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" "project_test1", 1L);
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionById(loginUser,
"project_test1", 1);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS)); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
//instance exit //instance exit
Mockito.when(processDefineMapper.selectById(46)).thenReturn(getProcessDefinition()); Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionById(loginUser, Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionByCode(loginUser,
"project_test1", 46); "project_test1", 46L);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
} }
@ -605,8 +597,6 @@ public class ProcessDefinitionServiceTest {
// project check auth success, processs definition online // project check auth success, processs definition online
ProcessDefinition processDefinition1 = getProcessDefinition(); ProcessDefinition processDefinition1 = getProcessDefinition();
processDefinition1.setResourceIds("1,2"); processDefinition1.setResourceIds("1,2");
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition1);
Mockito.when(processService.getUserById(1)).thenReturn(loginUser);
Map<String, Object> onlineWithResourceRes = processDefinitionService.releaseProcessDefinition( Map<String, Object> onlineWithResourceRes = processDefinitionService.releaseProcessDefinition(
loginUser, "project_test1", 46, ReleaseState.ONLINE); loginUser, "project_test1", 46, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, onlineWithResourceRes.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, onlineWithResourceRes.get(Constants.STATUS));
@ -944,7 +934,6 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition);
String sqlDependentJson = "{\n" String sqlDependentJson = "{\n"
+ " \"globalParams\": [\n" + " \"globalParams\": [\n"

71
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DagData.java

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import java.util.List;
/**
* DagData
*/
public class DagData {
/**
* processDefinition
*/
private ProcessDefinition processDefinition;
/**
* processTaskRelationList
*/
private List<ProcessTaskRelationLog> processTaskRelationList;
/**
* processTaskRelationList
*/
private List<TaskDefinitionLog> taskDefinitionList;
public DagData(ProcessDefinition processDefinition, List<ProcessTaskRelationLog> processTaskRelationList, List<TaskDefinitionLog> taskDefinitionList) {
this.processDefinition = processDefinition;
this.processTaskRelationList = processTaskRelationList;
this.taskDefinitionList = taskDefinitionList;
}
public ProcessDefinition getProcessDefinition() {
return processDefinition;
}
public void setProcessDefinition(ProcessDefinition processDefinition) {
this.processDefinition = processDefinition;
}
public List<ProcessTaskRelationLog> getProcessTaskRelationList() {
return processTaskRelationList;
}
public void setProcessTaskRelationList(List<ProcessTaskRelationLog> processTaskRelationList) {
this.processTaskRelationList = processTaskRelationList;
}
public List<TaskDefinitionLog> getTaskDefinitionList() {
return taskDefinitionList;
}
public void setTaskDefinitionList(List<TaskDefinitionLog> taskDefinitionList) {
this.taskDefinitionList = taskDefinitionList;
}
}

33
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -65,6 +65,7 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.CycleDependency; import org.apache.dolphinscheduler.dao.entity.CycleDependency;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand; import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessData;
@ -2264,7 +2265,7 @@ public class ProcessService {
* save task relations * save task relations
*/ */
public int saveTaskRelation(User operator, long projectCode, long processDefinitionCode, int processDefinitionVersion, public int saveTaskRelation(User operator, long projectCode, long processDefinitionCode, int processDefinitionVersion,
List<ProcessTaskRelationLog> taskRelationList) { List<ProcessTaskRelationLog> taskRelationList) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
if (!processTaskRelationList.isEmpty()) { if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode); processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
@ -2488,9 +2489,29 @@ public class ProcessService {
return DagHelper.buildDagGraph(processDag); return DagHelper.buildDagGraph(processDag);
} }
/**
* generate DagData
*/
public DagData genDagData(ProcessDefinition processDefinition) {
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion()));
}
if (processTaskRelation.getPostTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()));
}
}
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
return new DagData(processDefinition, processTaskRelations, taskDefinitionLogs);
}
/** /**
* generate ProcessData * generate ProcessData
* it will be replaced by genDagData method
*/ */
@Deprecated
public ProcessData genProcessData(ProcessDefinition processDefinition) { public ProcessData genProcessData(ProcessDefinition processDefinition) {
Map<String, String> locationMap = locationToMap(processDefinition.getLocations()); Map<String, String> locationMap = locationToMap(processDefinition.getLocations());
List<TaskNode> taskNodes = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion(), locationMap); List<TaskNode> taskNodes = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion(), locationMap);
@ -2562,10 +2583,6 @@ public class ProcessService {
/** /**
* find task definition by code and version * find task definition by code and version
*
* @param taskCode
* @param taskDefinitionVersion
* @return
*/ */
public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) { public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) {
return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion); return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion);
@ -2573,10 +2590,6 @@ public class ProcessService {
/** /**
* query tasks definition list by process code and process version * query tasks definition list by process code and process version
*
* @param processCode
* @param processVersion
* @return
*/ */
public List<TaskDefinitionLog> queryTaskDefinitionList(Long processCode, int processVersion) { public List<TaskDefinitionLog> queryTaskDefinitionList(Long processCode, int processVersion) {
List<ProcessTaskRelationLog> processTaskRelationLogs = List<ProcessTaskRelationLog> processTaskRelationLogs =
@ -2617,7 +2630,7 @@ public class ProcessService {
* add authorized resources * add authorized resources
* *
* @param ownResources own resources * @param ownResources own resources
* @param userId userId * @param userId userId
*/ */
private void addAuthorizedResources(List<Resource> ownResources, int userId) { private void addAuthorizedResources(List<Resource> ownResources, int userId) {
List<Integer> relationResourceIds = resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 7); List<Integer> relationResourceIds = resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 7);

Loading…
Cancel
Save