diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java index 32e6b011d6..d33c9017d1 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java @@ -86,25 +86,22 @@ public class V320DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader { if (CollectionUtils.isEmpty(needUpdateWorkflowInstance)) { return; } - for (ProcessInstance processInstance : needUpdateWorkflowInstance) { - ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion( - processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); - if (processDefinitionLog != null) { - processInstance.setProjectCode(processDefinitionLog.getProjectCode()); - processInstance.setTenantCode(tenantMap.get(processDefinitionLog.getTenantId())); - processInstance.setExecutorName(userMap.get(processInstance.getExecutorId())); - } else { - processInstance.setProjectCode(-1L); - } - processInstanceMapper.updateById(processInstance); - } + needUpdateWorkflowInstance.parallelStream() + .forEach(processInstance -> { + ProcessDefinitionLog processDefinitionLog = + processDefinitionLogMapper.queryByDefinitionCodeAndVersion( + processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + if (processDefinitionLog != null) { + processInstance.setProjectCode(processDefinitionLog.getProjectCode()); + processInstance.setTenantCode(tenantMap.get(processDefinitionLog.getTenantId())); + processInstance.setExecutorName(userMap.get(processInstance.getExecutorId())); + } else { + processInstance.setProjectCode(-1L); + } + processInstanceMapper.updateById(processInstance); + }); log.info("Success upgrade workflow instance, current batch size: {}", needUpdateWorkflowInstance.size()); - - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - log.error("Upgrade workflow instance error", e); - } } } @@ -118,24 +115,20 @@ public class V320DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader { if (CollectionUtils.isEmpty(taskInstances)) { return; } - for (TaskInstance taskInstance : taskInstances) { - ProcessInstance processInstance = processInstanceMapper.selectById(taskInstance.getProcessInstanceId()); - if (processInstance == null) { - taskInstance.setProjectCode(-1L); - } else { - taskInstance.setProjectCode(processInstance.getProjectCode()); - taskInstance.setProcessInstanceName(processInstance.getName()); - taskInstance.setExecutorName(processInstance.getExecutorName()); - } - taskInstanceMapper.updateById(taskInstance); - } - + taskInstances.parallelStream() + .forEach(taskInstance -> { + ProcessInstance processInstance = + processInstanceMapper.selectById(taskInstance.getProcessInstanceId()); + if (processInstance == null) { + taskInstance.setProjectCode(-1L); + } else { + taskInstance.setProjectCode(processInstance.getProjectCode()); + taskInstance.setProcessInstanceName(processInstance.getName()); + taskInstance.setExecutorName(processInstance.getExecutorName()); + } + taskInstanceMapper.updateById(taskInstance); + }); log.info("Success upgrade task instance, current batch size: {}", taskInstances.size()); - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - log.error("Upgrade task instance error", e); - } } }