Browse Source

[Improvement][Batch Query] Batch query ProcessDefinitions belongs to need failover ProcessInstance. (#12506)

3.2.0-release
ZhenjiLiu 2 years ago committed by GitHub
parent
commit
7cdb926a5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
  2. 31
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
  3. 18
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
  4. 9
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java

@ -18,8 +18,11 @@
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import java.util.List;
import javax.annotation.Nullable;
public interface ProcessDefinitionDao {
@ -37,4 +40,10 @@ public interface ProcessDefinitionDao {
int userId,
long projectCode);
/**
* query process definitions by definition codes and versions
* @param processInstances process instances where codes and version come from
* @return
*/
List<ProcessDefinition> queryProcessDefinitionsByCodesAndVersions(List<ProcessInstance> processInstances);
}

31
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java

@ -18,10 +18,18 @@
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
@ -33,6 +41,8 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
@Override
public PageListingResult<ProcessDefinition> listingProcessDefinition(int pageNumber, int pageSize, String searchVal,
@ -48,4 +58,25 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
.records(processDefinitions.getRecords())
.build();
}
@Override
public List<ProcessDefinition> queryProcessDefinitionsByCodesAndVersions(List<ProcessInstance> processInstances) {
if (Objects.isNull(processInstances) || processInstances.isEmpty()) {
return new ArrayList<>();
}
List<ProcessDefinitionLog> processDefinitionLogs = processInstances
.parallelStream()
.map(processInstance -> {
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper
.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
return processDefinitionLog;
})
.collect(Collectors.toList());
List<ProcessDefinition> processDefinitions =
processDefinitionLogs.stream().map(log -> (ProcessDefinition) log).collect(Collectors.toList());
return processDefinitions;
}
}

18
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -48,14 +49,17 @@ import org.apache.commons.lang3.time.StopWatch;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import io.micrometer.core.annotation.Counted;
@ -78,6 +82,9 @@ public class MasterFailoverService {
private final TaskInstanceDao taskInstanceDao;
@Autowired
private ProcessDefinitionDao processDefinitionDao;
public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@ -153,6 +160,12 @@ public class MasterFailoverService {
needFailoverProcessInstanceList.size(),
needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
List<ProcessDefinition> processDefinitions =
processDefinitionDao.queryProcessDefinitionsByCodesAndVersions(needFailoverProcessInstanceList);
Map<Long, ProcessDefinition> codeDefinitionMap = processDefinitions
.stream()
.collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity()));
for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
@ -161,10 +174,7 @@ public class MasterFailoverService {
LOGGER.info("WorkflowInstance doesn't need to failover");
continue;
}
// todo: use batch query
ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
ProcessDefinition processDefinition = codeDefinitionMap.get(processInstance.getProcessDefinitionCode());
processInstance.setProcessDefinition(processDefinition);
int processInstanceId = processInstance.getId();
List<TaskInstance> taskInstanceList =

9
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@ -42,6 +43,7 @@ import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@ -56,6 +58,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.springframework.context.ApplicationContext;
import org.springframework.util.ReflectionUtils;
import com.google.common.collect.Lists;
@ -92,6 +95,9 @@ public class FailoverServiceTest {
@Mock
private LogClient logClient;
@Mock
private ProcessDefinitionDao processDefinitionDao;
private static int masterPort = 5678;
private static int workerPort = 1234;
@ -113,6 +119,9 @@ public class FailoverServiceTest {
MasterFailoverService masterFailoverService =
new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager,
processInstanceExecCacheManager, logClient, taskInstanceDao);
Field processDefinitionDaoField = masterFailoverService.getClass().getDeclaredField("processDefinitionDao");
processDefinitionDaoField.setAccessible(true);
ReflectionUtils.setField(processDefinitionDaoField, masterFailoverService, processDefinitionDao);
WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
masterConfig,
processService,

Loading…
Cancel
Save