Browse Source

[Feature][JsonSplit] Fix batchMove of processDefinition bug (#5371)

* update SnowFlake

* update processDefinite from processInstance

* update processDefinite from processInstance

* Fix task logger path

* Fix dependTask bug

* Fix batchMove of processDefinition bug

* codeStyle

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
db96bf2dfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 116
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 54
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

116
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -55,6 +55,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@ -155,6 +156,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired
private SchedulerService schedulerService;
/**
@ -563,7 +565,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// To check resources whether they are already cancel authorized or deleted
String resourceIds = processDefinition.getResourceIds();
if (StringUtils.isNotBlank(resourceIds)) {
Integer[] resourceIdArray = Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new);
Integer[] resourceIdArray = Arrays.stream(resourceIds.split(Constants.COMMA)).map(Integer::parseInt).toArray(Integer[]::new);
PermissionCheck<Integer> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE_ID, processService, resourceIdArray, loginUser.getId(), logger);
try {
permissionCheck.checkPermission();
@ -1463,8 +1465,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return checkResult;
}
// TODO:
// Project targetProject = projectMapper.queryDetailByCode(targetProjectCode);
Project targetProject = projectMapper.queryDetailById(targetProjectId);
if (targetProject == null) {
putMsg(result, Status.PROJECT_NOT_FOUNT, targetProjectId);
@ -1501,7 +1501,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int targetProjectId) {
Map<String, Object> result = new HashMap<>();
List<String> failedProcessList = new ArrayList<>();
//check src project auth
Map<String, Object> checkResult = checkProjectAndAuth(loginUser, projectName);
if (checkResult != null) {
@ -1513,8 +1512,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
// TODO :
// Project targetProject = projectMapper.queryDetailByCode(targetProjectCode);
Project targetProject = projectMapper.queryDetailById(targetProjectId);
if (targetProject == null) {
putMsg(result, Status.PROJECT_NOT_FOUNT, targetProjectId);
@ -1528,14 +1525,63 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
}
String[] processDefinitionIdList = processDefinitionIds.split(Constants.COMMA);
doBatchMoveProcessDefinition(targetProject, failedProcessList, processDefinitionIdList);
Integer[] definitionIds = Arrays.stream(processDefinitionIds.split(Constants.COMMA)).map(Integer::parseInt).toArray(Integer[]::new);
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryDefinitionListByIdList(definitionIds);
for (ProcessDefinition processDefinition : processDefinitionList) {
ProcessDefinitionLog processDefinitionLog = moveProcessDefinition(loginUser, targetProject.getCode(), processDefinition, result, failedProcessList);
if (processDefinitionLog != null) {
moveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinitionLog);
}
}
checkBatchOperateResult(projectName, targetProject.getName(), result, failedProcessList, false);
return result;
}
private ProcessDefinitionLog moveProcessDefinition(User loginUser, Long targetProjectCode, ProcessDefinition processDefinition,
Map<String, Object> result, List<String> failedProcessList) {
try {
Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
processDefinitionLog.setVersion(version == null || version == 0 ? 1 : version + 1);
processDefinitionLog.setProjectCode(targetProjectCode);
processDefinitionLog.setOperator(loginUser.getId());
Date now = new Date();
processDefinitionLog.setOperateTime(now);
processDefinitionLog.setUpdateTime(now);
processDefinitionLog.setCreateTime(now);
int update = processDefinitionMapper.updateById(processDefinitionLog);
int insertLog = processDefinitionLogMapper.insert(processDefinitionLog);
if ((insertLog & update) > 0) {
putMsg(result, Status.SUCCESS);
} else {
failedProcessList.add(processDefinition.getId() + "[" + processDefinition.getName() + "]");
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
return processDefinitionLog;
} catch (Exception e) {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
failedProcessList.add(processDefinition.getId() + "[" + processDefinition.getName() + "]");
logger.error("move processDefinition error: {}", e.getMessage(), e);
}
return null;
}
private void moveTaskRelation(User loginUser, Long projectCode, ProcessDefinitionLog processDefinition) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(projectCode, processDefinition.getCode());
}
Date now = new Date();
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
processTaskRelation.setProjectCode(processDefinition.getProjectCode());
processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelation.setCreateTime(now);
processTaskRelation.setUpdateTime(now);
processService.saveTaskRelation(loginUser, processTaskRelation);
}
}
/**
* switch the defined process definition verison
*
@ -1585,30 +1631,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
/**
* do batch move process definition
*
* @param targetProject targetProject
* @param failedProcessList failedProcessList
* @param processDefinitionIdList processDefinitionIdList
*/
private void doBatchMoveProcessDefinition(Project targetProject, List<String> failedProcessList, String[] processDefinitionIdList) {
for (String processDefinitionId : processDefinitionIdList) {
try {
Map<String, Object> moveProcessDefinitionResult =
moveProcessDefinition(Integer.valueOf(processDefinitionId), targetProject);
if (!Status.SUCCESS.equals(moveProcessDefinitionResult.get(Constants.STATUS))) {
setFailedProcessList(failedProcessList, processDefinitionId);
logger.error((String) moveProcessDefinitionResult.get(Constants.MSG));
}
} catch (Exception e) {
setFailedProcessList(failedProcessList, processDefinitionId);
logger.error("move processDefinition error: {}", e.getMessage(), e);
}
}
}
/**
* batch copy process definition
*
@ -1668,34 +1690,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return null;
}
/**
* move process definition
*
* @param processId processId
* @param targetProject targetProject
* @return move result code
*/
private Map<String, Object> moveProcessDefinition(Integer processId,
Project targetProject) {
Map<String, Object> result = new HashMap<>();
ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
return result;
}
processDefinition.setProjectId(targetProject.getId());
processDefinition.setUpdateTime(new Date());
if (processDefinitionMapper.updateById(processDefinition) > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
return result;
}
/**
* check batch operate result
*

54
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2370,16 +2370,20 @@ public class ProcessService {
}
}
for (ProcessTaskRelation processTaskRelation : builderRelationList) {
processTaskRelationMapper.insert(processTaskRelation);
// save process task relation log
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
processTaskRelationLog.setOperator(operator.getId());
processTaskRelationLog.setOperateTime(now);
processTaskRelationLogMapper.insert(processTaskRelationLog);
saveTaskRelation(operator, processTaskRelation);
}
return 0;
}
public void saveTaskRelation(User operator, ProcessTaskRelation processTaskRelation) {
processTaskRelationMapper.insert(processTaskRelation);
// save process task relation log
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
processTaskRelationLog.setOperator(operator.getId());
processTaskRelationLog.setOperateTime(new Date());
processTaskRelationLogMapper.insert(processTaskRelationLog);
}
public int saveTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {
Date now = new Date();
taskDefinition.setProjectCode(projectCode);
@ -2498,44 +2502,6 @@ public class ProcessService {
return new ArrayList<>(taskNodeMap.values());
}
/**
* getTaskNodeFromTaskInstance
* return null if task definition do not exists
*
* @param taskInstance
* @return
*/
public TaskNode getTaskNodeFromTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = new TaskNode();
ProcessInstance processInstance = processInstanceMapper.selectById(taskInstance.getProcessInstanceId());
TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
if (taskDefinition == null) {
return null;
}
List<ProcessTaskRelationLog> taskRelationList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()
);
Map<Long, Integer> taskCodeMap = new HashedMap();
taskRelationList.forEach(relation -> taskCodeMap.putIfAbsent(relation.getPostTaskCode(), relation.getPostTaskVersion()));
taskNode.setCode(taskDefinition.getCode());
taskNode.setVersion(taskDefinition.getVersion());
taskNode.setName(taskDefinition.getName());
taskNode.setName(taskDefinition.getName());
taskNode.setDesc(taskDefinition.getDescription());
taskNode.setType(taskDefinition.getTaskType());
taskNode.setRunFlag(taskDefinition.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_FORBIDDEN : Constants.FLOWNODE_RUN_FLAG_NORMAL);
taskNode.setMaxRetryTimes(taskDefinition.getFailRetryTimes());
taskNode.setRetryInterval(taskDefinition.getFailRetryInterval());
taskNode.setParams(taskDefinition.getTaskParams());
taskNode.setTaskInstancePriority(taskDefinition.getTaskPriority());
taskNode.setWorkerGroup(taskDefinition.getWorkerGroup());
return taskNode;
}
/**
* find task definition by code and verision
*

Loading…
Cancel
Save