Browse Source

[Feature][JsonSplit] modify updateProcessDefinition/switchProcessDefinitionVersion (#4666)

* processdefinition create/delete method

* init

* add relation parse

* delete process_definition_json

* modify updateProcessDefinition/switchProcessDefinitionVersion

* codestyle

* codestyle

* fix compile error (version type)

* fix compile error (version type)
pull/3/MERGE
Simon 4 years ago committed by GitHub
parent
commit
c511658f6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionVersionService.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  3. 181
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  4. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java
  5. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  6. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionVersionServiceTest.java
  7. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  8. 6
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
  9. 89
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
  10. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
  11. 27
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
  12. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionVersionService.java

@ -31,7 +31,7 @@ public interface ProcessDefinitionVersionService {
* @param processDefinition the process definition that need to record version * @param processDefinition the process definition that need to record version
* @return the newest version number of this process definition * @return the newest version number of this process definition
*/ */
long addProcessDefinitionVersion(ProcessDefinition processDefinition); int addProcessDefinitionVersion(ProcessDefinition processDefinition);
/** /**
* query the pagination versions info by one certain process definition id * query the pagination versions info by one certain process definition id

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -479,7 +479,7 @@ public class ProcessInstanceService extends BaseService {
processDefinition.setUpdateTime(new Date()); processDefinition.setUpdateTime(new Date());
// add process definition version // add process definition version
long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition); int version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition);
processDefinition.setVersion(version); processDefinition.setVersion(version);
updateDefine = processDefineMapper.updateById(processDefinition); updateDefine = processDefineMapper.updateById(processDefinition);
} }

181
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.BaseService; import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.ProjectService;
@ -64,10 +63,10 @@ import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionVersion;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
@ -135,9 +134,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Autowired @Autowired
private ProjectService projectService; private ProjectService projectService;
@Autowired
private ProcessDefinitionVersionService processDefinitionVersionService;
@Autowired @Autowired
private TaskDefinitionService taskDefinitionService; private TaskDefinitionService taskDefinitionService;
@ -148,7 +144,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
private ProcessDefinitionLogMapper processDefinitionLogMapper; private ProcessDefinitionLogMapper processDefinitionLogMapper;
@Autowired @Autowired
private ProcessDefinitionMapper processDefineMapper; private ProcessDefinitionMapper processDefinitionMapper;
@Autowired @Autowired
private ProcessInstanceService processInstanceService; private ProcessInstanceService processInstanceService;
@ -212,6 +208,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
} }
processDefinition.setName(name); processDefinition.setName(name);
processDefinition.setVersion(1);
processDefinition.setReleaseState(ReleaseState.OFFLINE); processDefinition.setReleaseState(ReleaseState.OFFLINE);
processDefinition.setUserId(loginUser.getId()); processDefinition.setUserId(loginUser.getId());
processDefinition.setDescription(desc); processDefinition.setDescription(desc);
@ -220,7 +217,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
processDefinition.setTimeout(processData.getTimeout()); processDefinition.setTimeout(processData.getTimeout());
processDefinition.setTenantId(processData.getTenantId()); processDefinition.setTenantId(processData.getTenantId());
processDefinition.setModifyBy(loginUser.getUserName()); processDefinition.setModifyBy(loginUser.getUserName());
processDefinition.setResourceIds(getResourceIds(processData));
//custom global params //custom global params
List<Property> globalParamsList = processData.getGlobalParams(); List<Property> globalParamsList = processData.getGlobalParams();
@ -234,7 +230,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
processDefinition.setFlag(Flag.YES); processDefinition.setFlag(Flag.YES);
// save the new process definition // save the new process definition
processDefineMapper.insert(processDefinition); processDefinitionMapper.insert(processDefinition);
// parse and save the taskDefinition and processTaskRelation // parse and save the taskDefinition and processTaskRelation
try { try {
@ -273,11 +269,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
processDefinitionLog.setOperateTime(now); processDefinitionLog.setOperateTime(now);
processDefinitionLogMapper.insert(processDefinitionLog); processDefinitionLogMapper.insert(processDefinitionLog);
// add process definition version
long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition);
processDefinition.setVersion(version);
processDefineMapper.updateVersionByProcessDefinitionId(processDefinition.getId(), version);
// return processDefinition object with ID // return processDefinition object with ID
result.put(Constants.DATA_LIST, processDefinition.getId()); result.put(Constants.DATA_LIST, processDefinition.getId());
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
@ -341,7 +332,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return checkResult; return checkResult;
} }
List<ProcessDefinition> resourceList = processDefineMapper.queryAllDefinitionList(project.getId()); List<ProcessDefinition> resourceList = processDefinitionMapper.queryAllDefinitionList(project.getId());
result.put(Constants.DATA_LIST, resourceList); result.put(Constants.DATA_LIST, resourceList);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
@ -372,7 +363,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
} }
Page<ProcessDefinition> page = new Page<>(pageNo, pageSize); Page<ProcessDefinition> page = new Page<>(pageNo, pageSize);
IPage<ProcessDefinition> processDefinitionIPage = processDefineMapper.queryDefineListPaging( IPage<ProcessDefinition> processDefinitionIPage = processDefinitionMapper.queryDefineListPaging(
page, searchVal, userId, project.getId(), isAdmin(loginUser)); page, searchVal, userId, project.getId(), isAdmin(loginUser));
PageInfo<ProcessDefinition> pageInfo = new PageInfo<>(pageNo, pageSize); PageInfo<ProcessDefinition> pageInfo = new PageInfo<>(pageNo, pageSize);
@ -404,7 +395,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return checkResult; return checkResult;
} }
ProcessDefinition processDefinition = processDefineMapper.selectById(processId); ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId);
if (processDefinition == null) { if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
} else { } else {
@ -426,7 +417,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return checkResult; return checkResult;
} }
ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(), processDefinitionName); ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getId(), processDefinitionName);
if (processDefinition == null) { if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName);
} else { } else {
@ -472,43 +463,55 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
if ((checkProcessJson.get(Constants.STATUS) != Status.SUCCESS)) { if ((checkProcessJson.get(Constants.STATUS) != Status.SUCCESS)) {
return checkProcessJson; return checkProcessJson;
} }
ProcessDefinition processDefine = processService.findProcessDefineById(id); // TODO processDefinitionMapper.queryByCode
ProcessDefinition processDefinition = processService.findProcessDefineById(id);
// check process definition exists // check process definition exists
if (processDefine == null) { if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, id); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, id);
return result; return result;
} }
if (processDefine.getReleaseState() == ReleaseState.ONLINE) { if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
// online can not permit edit // online can not permit edit
putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefine.getName()); putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName());
return result; return result;
} }
if (!name.equals(processDefine.getName())) { if (!name.equals(processDefinition.getName())) {
// check whether the new process define name exist // check whether the new process define name exist
ProcessDefinition definition = processDefineMapper.verifyByDefineName(project.getId(), name); ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getId(), name);
if (definition != null) { if (definition != null) {
putMsg(result, Status.VERIFY_PROCESS_DEFINITION_NAME_UNIQUE_ERROR, name); putMsg(result, Status.VERIFY_PROCESS_DEFINITION_NAME_UNIQUE_ERROR, name);
return result; return result;
} }
} }
// get the processdefinitionjson before saving,and then save the name and taskid // get the processdefinitionjson before saving,and then save the name and taskid
String oldJson = processDefine.getProcessDefinitionJson(); String oldJson = processDefinition.getProcessDefinitionJson();
processDefinitionJson = processService.changeJson(processData,oldJson); processDefinitionJson = processService.changeJson(processData, oldJson);
Date now = new Date();
processDefine.setId(id); // update TaskDefinition
processDefine.setName(name); ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
processDefine.setReleaseState(ReleaseState.OFFLINE); List<TaskNode> taskNodeList = (newProcessData.getTasks() == null) ? new ArrayList<>() : newProcessData.getTasks();
processDefine.setProjectId(project.getId());
processDefine.setProcessDefinitionJson(processDefinitionJson); for (TaskNode task : taskNodeList) {
processDefine.setDescription(desc); // TODO update by code directly
processDefine.setLocations(locations); Map<String, Object> stringObjectMap = taskDefinitionService.queryTaskDefinitionByName(loginUser, projectName, task.getName());
processDefine.setConnects(connects); TaskDefinition taskDefinition = (TaskDefinition) stringObjectMap.get(Constants.DATA_LIST);
processDefine.setTimeout(processData.getTimeout()); taskDefinitionService.updateTaskDefinition(loginUser, projectName, taskDefinition.getCode(), JSONUtils.toJsonString(task));
processDefine.setTenantId(processData.getTenantId()); }
processDefine.setModifyBy(loginUser.getUserName());
processDefine.setResourceIds(getResourceIds(processData)); List<ProcessDefinitionLog> processDefinitionLogs = processDefinitionLogMapper.queryByDefinitionCode(processDefinition.getCode());
int version = getNextVersion(processDefinitionLogs);
Date now = new Date();
processDefinition.setVersion(version);
processDefinition.setName(name);
processDefinition.setReleaseState(ReleaseState.OFFLINE);
processDefinition.setProjectCode(project.getCode());
processDefinition.setDescription(desc);
processDefinition.setLocations(locations);
processDefinition.setConnects(connects);
processDefinition.setTimeout(processData.getTimeout());
processDefinition.setTenantId(processData.getTenantId());
//custom global params //custom global params
List<Property> globalParamsList = new ArrayList<>(); List<Property> globalParamsList = new ArrayList<>();
@ -516,23 +519,39 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
Set<Property> userDefParamsSet = new HashSet<>(processData.getGlobalParams()); Set<Property> userDefParamsSet = new HashSet<>(processData.getGlobalParams());
globalParamsList = new ArrayList<>(userDefParamsSet); globalParamsList = new ArrayList<>(userDefParamsSet);
} }
processDefine.setGlobalParamList(globalParamsList); processDefinition.setGlobalParamList(globalParamsList);
processDefine.setUpdateTime(now); processDefinition.setUpdateTime(now);
processDefine.setFlag(Flag.YES); processDefinition.setFlag(Flag.YES);
// add process definition version
long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefine);
processDefine.setVersion(version);
if (processDefineMapper.updateById(processDefine) > 0) { processDefinition.setVersion(version);
int update = processDefinitionMapper.updateById(processDefinition);
// save processDefinitionLog
ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject(
JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class);
processDefinitionLog.setOperator(loginUser.getId());
processDefinitionLog.setOperateTime(now);
int insert = processDefinitionLogMapper.insert(processDefinitionLog);
if (update > 0 && insert > 0) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefineMapper.queryByDefineId(id)); result.put(Constants.DATA_LIST, processDefinition);
} else { } else {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
} }
return result; return result;
} }
private int getNextVersion(List<ProcessDefinitionLog> processDefinitionLogs) {
return processDefinitionLogs
.stream()
.map(ProcessDefinitionLog::getVersion)
.max((x, y) -> x > y ? x : y)
.orElse(0) + 1;
}
/** /**
* verify process definition name unique * verify process definition name unique
* *
@ -552,7 +571,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
if (resultEnum != Status.SUCCESS) { if (resultEnum != Status.SUCCESS) {
return checkResult; return checkResult;
} }
ProcessDefinition processDefinition = processDefineMapper.verifyByDefineName(project.getId(), name); ProcessDefinition processDefinition = processDefinitionMapper.verifyByDefineName(project.getId(), name);
if (processDefinition == null) { if (processDefinition == null) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
@ -582,7 +601,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return checkResult; return checkResult;
} }
ProcessDefinition processDefinition = processDefineMapper.selectById(processDefinitionId); ProcessDefinition processDefinition = processDefinitionMapper.selectById(processDefinitionId);
// TODO: replace id to code // TODO: replace id to code
// ProcessDefinition processDefinition = processDefineMapper.selectByCode(processDefinitionCode); // ProcessDefinition processDefinition = processDefineMapper.selectByCode(processDefinitionCode);
@ -628,7 +647,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
// TODO: replace id to code // TODO: replace id to code
// ProcessDefinition processDefinition = processDefineMapper.deleteByCode(processDefinitionCode); // ProcessDefinition processDefinition = processDefineMapper.deleteByCode(processDefinitionCode);
int delete = processDefineMapper.deleteById(processDefinitionId); int delete = processDefinitionMapper.deleteById(processDefinitionId);
if (delete > 0) { if (delete > 0) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
@ -665,7 +684,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return result; return result;
} }
ProcessDefinition processDefinition = processDefineMapper.selectById(id); ProcessDefinition processDefinition = processDefinitionMapper.selectById(id);
switch (releaseState) { switch (releaseState) {
case ONLINE: case ONLINE:
@ -684,11 +703,11 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
} }
processDefinition.setReleaseState(releaseState); processDefinition.setReleaseState(releaseState);
processDefineMapper.updateById(processDefinition); processDefinitionMapper.updateById(processDefinition);
break; break;
case OFFLINE: case OFFLINE:
processDefinition.setReleaseState(releaseState); processDefinition.setReleaseState(releaseState);
processDefineMapper.updateById(processDefinition); processDefinitionMapper.updateById(processDefinition);
List<Schedule> scheduleList = scheduleMapper.selectAllByProcessDefineArray( List<Schedule> scheduleList = scheduleMapper.selectAllByProcessDefineArray(
new int[]{processDefinition.getId()} new int[]{processDefinition.getId()}
); );
@ -748,7 +767,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
for (String strProcessDefinitionId : processDefinitionIdArray) { for (String strProcessDefinitionId : processDefinitionIdArray) {
//get workflow info //get workflow info
int processDefinitionId = Integer.parseInt(strProcessDefinitionId); int processDefinitionId = Integer.parseInt(strProcessDefinitionId);
ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId); ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId);
if (null != processDefinition) { if (null != processDefinition) {
processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition)); processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition));
} }
@ -1143,14 +1162,14 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
//get sub process info //get sub process info
ObjectNode subParams = (ObjectNode) taskNode.path("params"); ObjectNode subParams = (ObjectNode) taskNode.path("params");
Integer subProcessId = subParams.path(PROCESSDEFINITIONID).asInt(); Integer subProcessId = subParams.path(PROCESSDEFINITIONID).asInt();
ProcessDefinition subProcess = processDefineMapper.queryByDefineId(subProcessId); ProcessDefinition subProcess = processDefinitionMapper.queryByDefineId(subProcessId);
//check is sub process exist in db //check is sub process exist in db
if (null == subProcess) { if (null == subProcess) {
continue; continue;
} }
String subProcessJson = subProcess.getProcessDefinitionJson(); String subProcessJson = subProcess.getProcessDefinitionJson();
//check current project has sub process //check current project has sub process
ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName()); ProcessDefinition currentProjectSubProcess = processDefinitionMapper.queryByDefineName(targetProject.getId(), subProcess.getName());
if (null == currentProjectSubProcess) { if (null == currentProjectSubProcess) {
ArrayNode subJsonArray = (ArrayNode) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get(TASKS); ArrayNode subJsonArray = (ArrayNode) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get(TASKS);
@ -1194,12 +1213,12 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
processDefine.setUpdateTime(now); processDefine.setUpdateTime(now);
processDefine.setFlag(subProcess.getFlag()); processDefine.setFlag(subProcess.getFlag());
processDefine.setWarningGroupId(subProcess.getWarningGroupId()); processDefine.setWarningGroupId(subProcess.getWarningGroupId());
processDefineMapper.insert(processDefine); processDefinitionMapper.insert(processDefine);
logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName()); logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName());
//modify task node //modify task node
ProcessDefinition newSubProcessDefine = processDefineMapper.queryByDefineName(processDefine.getProjectId(), processDefine.getName()); ProcessDefinition newSubProcessDefine = processDefinitionMapper.queryByDefineName(processDefine.getProjectId(), processDefine.getName());
if (null != newSubProcessDefine) { if (null != newSubProcessDefine) {
subProcessIdMap.put(subProcessId, newSubProcessDefine.getId()); subProcessIdMap.put(subProcessId, newSubProcessDefine.getId());
@ -1273,7 +1292,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
public Map<String, Object> getTaskNodeListByDefinitionId(Integer defineId) { public Map<String, Object> getTaskNodeListByDefinitionId(Integer defineId) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); ProcessDefinition processDefinition = processDefinitionMapper.selectById(defineId);
if (processDefinition == null) { if (processDefinition == null) {
logger.info("process define not exists"); logger.info("process define not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineId); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineId);
@ -1317,7 +1336,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
idIntList.add(Integer.parseInt(definitionId)); idIntList.add(Integer.parseInt(definitionId));
} }
Integer[] idArray = idIntList.toArray(new Integer[0]); Integer[] idArray = idIntList.toArray(new Integer[0]);
List<ProcessDefinition> processDefinitionList = processDefineMapper.queryDefinitionListByIdList(idArray); List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryDefinitionListByIdList(idArray);
if (CollectionUtils.isEmpty(processDefinitionList)) { if (CollectionUtils.isEmpty(processDefinitionList)) {
logger.info("process definition not exists"); logger.info("process definition not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineIdList); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineIdList);
@ -1349,7 +1368,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
HashMap<String, Object> result = new HashMap<>(5); HashMap<String, Object> result = new HashMap<>(5);
List<ProcessDefinition> resourceList = processDefineMapper.queryAllDefinitionList(projectId); List<ProcessDefinition> resourceList = processDefinitionMapper.queryAllDefinitionList(projectId);
result.put(Constants.DATA_LIST, resourceList); result.put(Constants.DATA_LIST, resourceList);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
@ -1368,7 +1387,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
public Map<String, Object> viewTree(Integer processId, Integer limit) throws Exception { public Map<String, Object> viewTree(Integer processId, Integer limit) throws Exception {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
ProcessDefinition processDefinition = processDefineMapper.selectById(processId); ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId);
if (null == processDefinition) { if (null == processDefinition) {
logger.info("process define not exists"); logger.info("process define not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinition); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinition);
@ -1540,7 +1559,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
} }
private String recursionProcessDefinitionName(Integer projectId, String processDefinitionName, int num) { private String recursionProcessDefinitionName(Integer projectId, String processDefinitionName, int num) {
ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(projectId, processDefinitionName); ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectId, processDefinitionName);
if (processDefinition != null) { if (processDefinition != null) {
if (num > 1) { if (num > 1) {
String str = processDefinitionName.substring(0, processDefinitionName.length() - 3); String str = processDefinitionName.substring(0, processDefinitionName.length() - 3);
@ -1560,7 +1579,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
ProcessDefinition processDefinition = processDefineMapper.selectById(processId); ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId);
if (processDefinition == null) { if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
return result; return result;
@ -1695,7 +1714,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return checkResult; return checkResult;
} }
ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId); ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId);
if (Objects.isNull(processDefinition)) { if (Objects.isNull(processDefinition)) {
putMsg(result putMsg(result
, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR , Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR
@ -1703,28 +1722,28 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return result; return result;
} }
ProcessDefinitionVersion processDefinitionVersion = processDefinitionVersionService ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper
.queryByProcessDefinitionIdAndVersion(processDefinitionId, version); .queryByDefinitionCodeAndVersion(processDefinition.getCode(),version);
if (Objects.isNull(processDefinitionVersion)) {
if (Objects.isNull(processDefinitionLog)) {
putMsg(result putMsg(result
, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR , Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR
, processDefinitionId , processDefinition.getCode()
, version); , version);
return result; return result;
} }
processDefinition.setVersion(processDefinitionVersion.getVersion()); processDefinition.setVersion(processDefinitionLog.getVersion());
processDefinition.setProcessDefinitionJson(processDefinitionVersion.getProcessDefinitionJson()); processDefinition.setDescription(processDefinitionLog.getDescription());
processDefinition.setDescription(processDefinitionVersion.getDescription()); processDefinition.setLocations(processDefinitionLog.getLocations());
processDefinition.setLocations(processDefinitionVersion.getLocations()); processDefinition.setConnects(processDefinitionLog.getConnects());
processDefinition.setConnects(processDefinitionVersion.getConnects()); processDefinition.setTimeout(processDefinitionLog.getTimeout());
processDefinition.setTimeout(processDefinitionVersion.getTimeout()); processDefinition.setGlobalParams(processDefinitionLog.getGlobalParams());
processDefinition.setGlobalParams(processDefinitionVersion.getGlobalParams());
processDefinition.setUpdateTime(new Date()); processDefinition.setUpdateTime(new Date());
processDefinition.setWarningGroupId(processDefinitionVersion.getWarningGroupId()); processDefinition.setWarningGroupId(processDefinitionLog.getWarningGroupId());
processDefinition.setResourceIds(processDefinitionVersion.getResourceIds()); processDefinition.setResourceIds(processDefinitionLog.getResourceIds());
if (processDefineMapper.updateById(processDefinition) > 0) { if (processDefinitionMapper.updateById(processDefinition) > 0) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR); putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
@ -1784,7 +1803,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
* @param processDefinitionId processDefinitionId * @param processDefinitionId processDefinitionId
*/ */
private void setFailedProcessList(List<String> failedProcessList, String processDefinitionId) { private void setFailedProcessList(List<String> failedProcessList, String processDefinitionId) {
ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(Integer.valueOf(processDefinitionId)); ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(Integer.valueOf(processDefinitionId));
if (processDefinition != null) { if (processDefinition != null) {
failedProcessList.add(processDefinitionId + "[" + processDefinition.getName() + "]"); failedProcessList.add(processDefinitionId + "[" + processDefinition.getName() + "]");
} else { } else {
@ -1823,7 +1842,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
ProcessDefinition processDefinition = processDefineMapper.selectById(processId); ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId);
if (processDefinition == null) { if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
return result; return result;
@ -1831,7 +1850,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
processDefinition.setProjectId(targetProject.getId()); processDefinition.setProjectId(targetProject.getId());
processDefinition.setUpdateTime(new Date()); processDefinition.setUpdateTime(new Date());
if (processDefineMapper.updateById(processDefinition) > 0) { if (processDefinitionMapper.updateById(processDefinition) > 0) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java

@ -61,7 +61,7 @@ public class ProcessDefinitionVersionServiceImpl extends BaseService implements
* @param processDefinition the process definition that need to record version * @param processDefinition the process definition that need to record version
* @return the newest version number of this process definition * @return the newest version number of this process definition
*/ */
public long addProcessDefinitionVersion(ProcessDefinition processDefinition) { public int addProcessDefinitionVersion(ProcessDefinition processDefinition) {
long version = this.queryMaxVersionByProcessDefinitionId(processDefinition.getId()) + 1; long version = this.queryMaxVersionByProcessDefinitionId(processDefinition.getId()) + 1;
@ -82,7 +82,7 @@ public class ProcessDefinitionVersionServiceImpl extends BaseService implements
processDefinitionVersionMapper.insert(processDefinitionVersion); processDefinitionVersionMapper.insert(processDefinitionVersion);
return version; return Integer.parseInt(String.valueOf(version));
} }
/** /**

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -969,7 +969,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition); Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition);
Mockito.when(processDefinitionVersionService.addProcessDefinitionVersion(processDefinition)).thenReturn(1L); Mockito.when(processDefinitionVersionService.addProcessDefinitionVersion(processDefinition)).thenReturn(1);
String sqlDependentJson = "{\n" String sqlDependentJson = "{\n"
+ " \"globalParams\": [\n" + " \"globalParams\": [\n"

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionVersionServiceTest.java

@ -68,7 +68,7 @@ public class ProcessDefinitionVersionServiceTest {
.queryMaxVersionByProcessDefinitionId(processDefinition.getId())) .queryMaxVersionByProcessDefinitionId(processDefinition.getId()))
.thenReturn(expectedVersion); .thenReturn(expectedVersion);
long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition); int version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition);
Assert.assertEquals(expectedVersion + 1, version); Assert.assertEquals(expectedVersion + 1, version);
} }

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -369,7 +369,7 @@ public class ProcessInstanceServiceTest {
when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant); when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant);
when(processService.updateProcessInstance(processInstance)).thenReturn(1); when(processService.updateProcessInstance(processInstance)).thenReturn(1);
when(processDefinitionService.checkProcessNodeList(Mockito.any(), eq(shellJson))).thenReturn(result); when(processDefinitionService.checkProcessNodeList(Mockito.any(), eq(shellJson))).thenReturn(result);
when(processDefinitionVersionService.addProcessDefinitionVersion(processDefinition)).thenReturn(1L); when(processDefinitionVersionService.addProcessDefinitionVersion(processDefinition)).thenReturn(1);
Map<String, Object> processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectName, 1, Map<String, Object> processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectName, 1,
shellJson, "2020-02-21 00:00:00", true, Flag.YES, "", ""); shellJson, "2020-02-21 00:00:00", true, Flag.YES, "", "");
Assert.assertEquals(Status.UPDATE_PROCESS_INSTANCE_ERROR, processInstanceFinishRes.get(Constants.STATUS)); Assert.assertEquals(Status.UPDATE_PROCESS_INSTANCE_ERROR, processInstanceFinishRes.get(Constants.STATUS));

6
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java

@ -61,7 +61,7 @@ public class ProcessDefinition {
/** /**
* version * version
*/ */
private long version; private int version;
/** /**
* release state : online/offline * release state : online/offline
@ -192,11 +192,11 @@ public class ProcessDefinition {
this.name = name; this.name = name;
} }
public long getVersion() { public int getVersion() {
return version; return version;
} }
public void setVersion(long version) { public void setVersion(int version) {
this.version = version; this.version = version;
} }

89
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java

@ -61,7 +61,7 @@ public class ProcessDefinitionLog {
/** /**
* version * version
*/ */
private long version; private int version;
/** /**
* release state : online/offline * release state : online/offline
@ -165,6 +165,17 @@ public class ProcessDefinitionLog {
*/ */
private String modifyBy; private String modifyBy;
/**
* warningGroupId
*/
@TableField(exist = false)
private int warningGroupId;
/**
* connects array for web
*/
private String connects;
/** /**
* resource ids * resource ids
*/ */
@ -181,6 +192,22 @@ public class ProcessDefinitionLog {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date operateTime; private Date operateTime;
public int getWarningGroupId() {
return warningGroupId;
}
public void setWarningGroupId(int warningGroupId) {
this.warningGroupId = warningGroupId;
}
public String getConnects() {
return connects;
}
public void setConnects(String connects) {
this.connects = connects;
}
public int getOperator() { public int getOperator() {
return operator; return operator;
} }
@ -221,11 +248,11 @@ public class ProcessDefinitionLog {
this.name = name; this.name = name;
} }
public long getVersion() { public int getVersion() {
return version; return version;
} }
public void setVersion(long version) { public void setVersion(int version) {
this.version = version; this.version = version;
} }
@ -403,32 +430,34 @@ public class ProcessDefinitionLog {
@Override @Override
public String toString() { public String toString() {
return "ProcessDefinitionLog{" return "ProcessDefinitionLog{"
+ "id=" + id + "id=" + id
+ ", code=" + code + ", code=" + code
+ ", name='" + name + '\'' + ", name='" + name + '\''
+ ", version=" + version + ", version=" + version
+ ", releaseState=" + releaseState + ", releaseState=" + releaseState
+ ", projectCode=" + projectCode + ", projectCode=" + projectCode
+ ", description='" + description + '\'' + ", description='" + description + '\''
+ ", globalParams='" + globalParams + '\'' + ", globalParams='" + globalParams + '\''
+ ", globalParamList=" + globalParamList + ", globalParamList=" + globalParamList
+ ", globalParamMap=" + globalParamMap + ", globalParamMap=" + globalParamMap
+ ", createTime=" + createTime + ", createTime=" + createTime
+ ", updateTime=" + updateTime + ", updateTime=" + updateTime
+ ", flag=" + flag + ", flag=" + flag
+ ", userId=" + userId + ", userId=" + userId
+ ", userName='" + userName + '\'' + ", userName='" + userName + '\''
+ ", projectName='" + projectName + '\'' + ", projectName='" + projectName + '\''
+ ", locations='" + locations + '\'' + ", locations='" + locations + '\''
+ ", receivers='" + receivers + '\'' + ", receivers='" + receivers + '\''
+ ", receiversCc='" + receiversCc + '\'' + ", receiversCc='" + receiversCc + '\''
+ ", scheduleReleaseState=" + scheduleReleaseState + ", scheduleReleaseState=" + scheduleReleaseState
+ ", timeout=" + timeout + ", timeout=" + timeout
+ ", tenantId=" + tenantId + ", tenantId=" + tenantId
+ ", modifyBy='" + modifyBy + '\'' + ", modifyBy='" + modifyBy + '\''
+ ", resourceIds='" + resourceIds + '\'' + ", warningGroupId=" + warningGroupId
+ ", operator=" + operator + ", connects='" + connects + '\''
+ ", operateTime=" + operateTime + ", resourceIds='" + resourceIds + '\''
+ '}'; + ", operator=" + operator
+ ", operateTime=" + operateTime
+ '}';
} }
} }

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java

@ -18,11 +18,8 @@
package org.apache.dolphinscheduler.dao.mapper; package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import java.util.List; import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/** /**
@ -48,4 +45,13 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
*/ */
List<ProcessDefinitionLog> queryByDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); List<ProcessDefinitionLog> queryByDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
/**
* query the certain process definition version info by process definition code and version number
*
* @param processDefinitionCode process definition code
* @param version version number
* @return the process definition version info
*/
ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("processDefinitionCode") Long processDefinitionCode, @Param("version") long version);
} }

27
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml

@ -20,15 +20,17 @@
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper"> <mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper">
<sql id="baseSql"> <sql id="baseSql">
pd.id, pd.code, pd.name, pd.version, pd.description, pd.project_code, id, code, name, version, description, project_code,
pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations, pd.connects, release_state, user_id,global_params, flag, locations, connects,
pd.warning_group_id, pd.timeout, pd.tenant_id,pd.operator, pd.operate_time, pd.create_time, warning_group_id, timeout, tenant_id,operator, operate_time, create_time,
pd.update_time, u.user_name,p.name as project_name update_time
</sql> </sql>
<select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog"> <select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select select pd.id, pd.code, pd.name, pd.version, pd.description, pd.project_code,
<include refid="baseSql"/> pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations, pd.connects,
pd.warning_group_id, pd.timeout, pd.tenant_id,pd.operator, pd.operate_time, pd.create_time,
pd.update_time, u.user_name,p.name as project_name
from t_ds_process_definition_log pd from t_ds_process_definition_log pd
JOIN t_ds_user u ON pd.user_id = u.id JOIN t_ds_user u ON pd.user_id = u.id
JOIN t_ds_project p ON pd.project_code = p.code JOIN t_ds_project p ON pd.project_code = p.code
@ -39,10 +41,17 @@
<select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog"> <select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select select
<include refid="baseSql"/> <include refid="baseSql"/>
from t_ds_process_definition_log pd from t_ds_process_definition_log
JOIN t_ds_user u ON pd.user_id = u.id
JOIN t_ds_project p ON pd.project_code = p.code
WHERE pd.code = #{processDefinitionCode} WHERE pd.code = #{processDefinitionCode}
</select> </select>
<select id="queryByProcessDefinitionCodeAndVersion"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select
<include refid="baseSql"/>
from t_ds_process_definition_log
where code = #{processDefinitionCode}
and version = #{version}
</select>
</mapper> </mapper>

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java

@ -358,7 +358,7 @@ public class ProcessDefinitionMapperTest {
@Test @Test
public void testUpdateVersionByProcessDefinitionId() { public void testUpdateVersionByProcessDefinitionId() {
long expectedVersion = 10; int expectedVersion = 10;
ProcessDefinition processDefinition = insertOne(); ProcessDefinition processDefinition = insertOne();
processDefinition.setVersion(expectedVersion); processDefinition.setVersion(expectedVersion);
processDefinitionMapper.updateVersionByProcessDefinitionId( processDefinitionMapper.updateVersionByProcessDefinitionId(

Loading…
Cancel
Save