Browse Source

Use parallelStream to improve the performance of upgrade (#13442)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
ef47e7efeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java

31
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java

@ -86,9 +86,12 @@ public class V320DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader {
if (CollectionUtils.isEmpty(needUpdateWorkflowInstance)) {
return;
}
for (ProcessInstance processInstance : needUpdateWorkflowInstance) {
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
needUpdateWorkflowInstance.parallelStream()
.forEach(processInstance -> {
ProcessDefinitionLog processDefinitionLog =
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (processDefinitionLog != null) {
processInstance.setProjectCode(processDefinitionLog.getProjectCode());
processInstance.setTenantCode(tenantMap.get(processDefinitionLog.getTenantId()));
@ -97,14 +100,8 @@ public class V320DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader {
processInstance.setProjectCode(-1L);
}
processInstanceMapper.updateById(processInstance);
}
});
log.info("Success upgrade workflow instance, current batch size: {}", needUpdateWorkflowInstance.size());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
log.error("Upgrade workflow instance error", e);
}
}
}
@ -118,8 +115,10 @@ public class V320DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader {
if (CollectionUtils.isEmpty(taskInstances)) {
return;
}
for (TaskInstance taskInstance : taskInstances) {
ProcessInstance processInstance = processInstanceMapper.selectById(taskInstance.getProcessInstanceId());
taskInstances.parallelStream()
.forEach(taskInstance -> {
ProcessInstance processInstance =
processInstanceMapper.selectById(taskInstance.getProcessInstanceId());
if (processInstance == null) {
taskInstance.setProjectCode(-1L);
} else {
@ -128,14 +127,8 @@ public class V320DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader {
taskInstance.setExecutorName(processInstance.getExecutorName());
}
taskInstanceMapper.updateById(taskInstance);
}
});
log.info("Success upgrade task instance, current batch size: {}", taskInstances.size());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
log.error("Upgrade task instance error", e);
}
}
}

Loading…
Cancel
Save