Browse Source

[Refactor]migrate some task dao functions from ProcessServiceImpl to dao modules. (#12505)

* migrate some task DAO to dao modules.

* Update dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java

Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>
3.2.0-release
Yann Ann 2 years ago committed by GitHub
parent
commit
04aa125ba2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
  2. 27
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  4. 16
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
  5. 9
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  6. 12
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  7. 49
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java
  8. 44
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java
  9. 44
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java
  10. 90
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
  11. 52
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java
  12. 87
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java
  13. 91
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java
  14. 170
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
  15. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  16. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
  17. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
  18. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
  19. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
  20. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java
  21. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
  22. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
  23. 35
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  24. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  25. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
  26. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
  27. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
  28. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
  29. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
  30. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
  31. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
  32. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
  33. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
  34. 24
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
  35. 23
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
  36. 31
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  37. 8
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
  38. 21
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
  39. 12
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  40. 12
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
  41. 16
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
  42. 11
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
  43. 35
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  44. 378
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  45. 32
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java

@ -33,9 +33,9 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang3.StringUtils;
@ -60,7 +60,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
private static final String LOG_HEAD_FORMAT = "[LOG-PATH]: %s, [HOST]: %s%s";
@Autowired
private ProcessService processService;
private TaskInstanceDao taskInstanceDao;
@Autowired
private LogClient logClient;
@ -86,7 +86,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
@SuppressWarnings("unchecked")
public Result<ResponseTaskLog> queryLog(int taskInstId, int skipLineNum, int limit) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstId);
if (taskInstance == null) {
logger.error("Task instance does not exist, taskInstanceId:{}.", taskInstId);
@ -111,7 +111,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
*/
@Override
public byte[] getLogBytes(int taskInstId) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstId);
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
throw new ServiceException("task instance is null or host is null");
}
@ -138,7 +138,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
return result;
}
// check whether the task instance can be found
TaskInstance task = processService.findTaskInstanceById(taskInstId);
TaskInstance task = taskInstanceDao.findTaskInstanceById(taskInstId);
if (task == null || StringUtils.isBlank(task.getHost())) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
@ -171,7 +171,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
throw new ServiceException("user has no permission");
}
// check whether the task instance can be found
TaskInstance task = processService.findTaskInstanceById(taskInstId);
TaskInstance task = taskInstanceDao.findTaskInstanceById(taskInstId);
if (task == null || StringUtils.isBlank(task.getHost())) {
throw new ServiceException("task instance is null or host is null");
}

27
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -103,6 +103,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
@ -200,6 +201,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private ProcessService processService;
@Autowired
private TaskDefinitionLogDao taskDefinitionLogDao;
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@ -388,9 +392,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (insertVersion == 0) {
logger.error("Save process definition error, processCode:{}.", processDefinition.getCode());
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
} else
} else {
logger.info("Save process definition complete, processCode:{}, processVersion:{}.",
processDefinition.getCode(), insertVersion);
}
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(),
insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
@ -398,9 +403,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
} else
} else {
logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
}
saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
@ -885,9 +891,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
logger.error("Update process definition error, processCode:{}.", processDefinition.getCode());
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} else
} else {
logger.info("Update process definition complete, processCode:{}, processVersion:{}.",
processDefinition.getCode(), insertVersion);
}
taskUsedInOtherTaskValid(processDefinition, taskRelationList);
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
@ -1136,9 +1143,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
projectCode, code, schedule.getId());
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
} else
} else {
logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
projectCode, code, schedule.getId());
}
schedulerService.deleteSchedule(project.getId(), schedule.getId());
}
}
@ -1184,8 +1192,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (CollectionUtils.isNotEmpty(dagDataSchedules)) {
logger.info("Start download process definition file, processDefinitionCodes:{}.", defineCodeSet);
downloadProcessDefinitionFile(response, dagDataSchedules);
} else
} else {
logger.error("There is no exported process dag data.");
}
}
/**
@ -1856,7 +1865,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinitions.get(0).getVersion());
// query task definition log
List<TaskDefinitionLog> taskDefinitionLogsList = processService.genTaskDefineList(processTaskRelations);
List<TaskDefinitionLog> taskDefinitionLogsList =
taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations);
List<DependentSimplifyDefinition> taskDefinitionList = new ArrayList<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogsList) {
@ -1906,7 +1916,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineCode(code, limit);
processInstanceList.forEach(processInstance -> processInstance
.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())));
List<TaskDefinitionLog> taskDefinitionList = processService.genTaskDefineList(processTaskRelationMapper
List<TaskDefinitionLog> taskDefinitionList = taskDefinitionLogDao.getTaskDefineLogList(processTaskRelationMapper
.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()));
Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
@ -2144,7 +2154,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
String otherParamsJson = doOtherOperateProcess(loginUser, processDefinition);
if (isCopy) {
logger.info("Copy process definition...");
List<TaskDefinitionLog> taskDefinitionLogs = processService.genTaskDefineList(processTaskRelations);
List<TaskDefinitionLog> taskDefinitionLogs =
taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations);
Map<Long, Long> taskCodeMap = new HashMap<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
try {

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -71,6 +71,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.WorkflowUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@ -129,6 +130,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired
ProcessService processService;
@Autowired
TaskInstanceDao taskInstanceDao;
@Autowired
ProcessInstanceMapper processInstanceMapper;
@ -366,7 +370,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
List<TaskInstance> taskInstanceList =
processService.findValidTaskListByProcessId(processId, processInstance.getTestFlag());
taskInstanceDao.findValidTaskListByProcessId(processId, processInstance.getTestFlag());
addDependResultForTaskList(taskInstanceList);
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
@ -444,7 +448,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskId);
if (taskInstance == null) {
logger.error("Task instance does not exist, projectCode:{}, taskInstanceId{}.", projectCode, taskId);
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);

16
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java

@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.MessageFormat;
import java.util.HashMap;
@ -63,7 +63,7 @@ public class LoggerServiceTest {
private LoggerServiceImpl loggerService;
@Mock
private ProcessService processService;
private TaskInstanceDao taskInstanceDao;
@Mock
private ProjectMapper projectMapper;
@ -81,7 +81,7 @@ public class LoggerServiceTest {
public void testQueryDataSourceList() {
TaskInstance taskInstance = new TaskInstance();
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
Result result = loggerService.queryLog(2, 1, 1);
// TASK_INSTANCE_NOT_FOUND
Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue());
@ -98,7 +98,7 @@ public class LoggerServiceTest {
// SUCCESS
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
result = loggerService.queryLog(1, 1, 1);
Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
}
@ -107,7 +107,7 @@ public class LoggerServiceTest {
public void testGetLogBytes() {
TaskInstance taskInstance = new TaskInstance();
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
// task instance is null
try {
@ -146,7 +146,7 @@ public class LoggerServiceTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
TaskInstance taskInstance = new TaskInstance();
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setCode(1L);
@ -156,7 +156,7 @@ public class LoggerServiceTest {
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, VIEW_LOG)).thenReturn(result);
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1);
Assertions.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
@ -184,7 +184,7 @@ public class LoggerServiceTest {
taskInstance.setLogPath("/temp/log");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, DOWNLOAD_LOG))
.thenReturn(result);
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString()))
.thenReturn(new byte[0]);

9
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -28,6 +28,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT;
import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest;
@ -67,6 +68,7 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.DbType;
@ -149,6 +151,9 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
@Mock
private ProcessService processService;
@Mock
private TaskDefinitionLogDao taskDefinitionLogDao;
@Mock
private ProcessInstanceService processInstanceService;
@ -756,8 +761,10 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
result.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(projectMapper.queryByCode(1)).thenReturn(project1);
Mockito.when(projectService.checkProjectAndAuth(user, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode))
.thenReturn(getProcessTaskRelation());
Mockito.when(taskDefinitionLogDao.getTaskDefineLogList(any())).thenReturn(new ArrayList<>());
Map<String, Object> taskNotNuLLRes =
processDefinitionService.viewTree(user, processDefinition.getProjectCode(), 46, 10);
Assertions.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));

12
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -57,6 +57,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
@ -104,6 +105,9 @@ public class ProcessInstanceServiceTest {
@Mock
ProcessService processService;
@Mock
TaskInstanceDao taskInstanceDao;
@Mock
ProcessInstanceDao processInstanceDao;
@ -408,7 +412,7 @@ public class ProcessInstanceServiceTest {
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(processInstance.getId()))
.thenReturn(Optional.of(processInstance));
when(processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
when(taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
.thenReturn(taskInstanceList);
when(loggerService.queryLog(taskInstance.getId(), 0, 4098)).thenReturn(res);
Map<String, Object> successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1);
@ -451,7 +455,7 @@ public class ProcessInstanceServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
when(processService.findTaskInstanceById(1)).thenReturn(null);
when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(null);
Map<String, Object> taskNullRes =
processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1);
Assertions.assertEquals(Status.TASK_INSTANCE_NOT_EXISTS, taskNullRes.get(Constants.STATUS));
@ -461,7 +465,7 @@ public class ProcessInstanceServiceTest {
taskInstance.setTaskType("HTTP");
taskInstance.setProcessInstanceId(1);
putMsg(result, Status.SUCCESS, projectCode);
when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
@ -481,7 +485,7 @@ public class ProcessInstanceServiceTest {
subTask.setTaskType("SUB_PROCESS");
subTask.setProcessInstanceId(1);
putMsg(result, Status.SUCCESS, projectCode);
when(processService.findTaskInstanceById(subTask.getId())).thenReturn(subTask);
when(taskInstanceDao.findTaskInstanceById(subTask.getId())).thenReturn(subTask);
when(processService.findSubProcessInstance(subTask.getProcessInstanceId(), subTask.getId())).thenReturn(null);
Map<String, Object> subprocessNotExistRes =
processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1);

49
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
/**
* Process Instance Map DAO
*/
public interface ProcessInstanceMapDao {
/**
* Update process instance map
* @param processInstanceMap process instance map
* @return result
*/
int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
/**
* Create process instance map to DB.
* @param processInstanceMap process instance map
* @return result
*/
int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
/**
* find work process map by parent process id and parent task id.
* @param parentWorkProcessId parentWorkProcessId
* @param parentTaskId parentTaskId
* @return process instance map
*/
ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId);
}

44
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import java.util.List;
/**
* Task Instance DAO
*/
public interface TaskDefinitionDao {
/**
* Get list of task definition by process definition code
* @param processDefinitionCode process definition code
* @return list of task definition
*/
List<TaskDefinition> getTaskDefinitionListByDefinition(long processDefinitionCode);
/**
* Query task definition by code and version
* @param taskCode task code
* @param taskDefinitionVersion task definition version
* @return task definition
*/
TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion);
}

44
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import java.util.List;
/**
* Task Definition Log DAO
*/
public interface TaskDefinitionLogDao {
/**
* Get task definition log list
* @param processTaskRelations list of process task relation
* @return list of task definition
*/
List<TaskDefinitionLog> getTaskDefineLogList(List<ProcessTaskRelation> processTaskRelations);
/**
* Query task definition log list by process task relation list
* @param processTaskRelations list of task relation
* @return list of task definition log
*/
List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations);
}

90
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.List;
/**
* Task Instance DAO
*/
public interface TaskInstanceDao {
/**
* Update or Insert task instance to DB.
* ID is null -> Insert
* ID is not null -> Update
* @param taskInstance task instance
* @return result
*/
boolean upsertTaskInstance(TaskInstance taskInstance);
/**
* Insert task instance to DB.
* @param taskInstance task instance
* @return result
*/
boolean insertTaskInstance(TaskInstance taskInstance);
/**
* Update task instance to DB.
* @param taskInstance task instance
* @return result
*/
boolean updateTaskInstance(TaskInstance taskInstance);
/**
* Submit a task instance to DB.
* @param taskInstance task instance
* @param processInstance process instance
* @return task instance
*/
TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance);
/**
* Query list of valid task instance by process instance id
* @param processInstanceId processInstanceId
* @param testFlag test flag
* @return list of valid task instance
*/
List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag);
/**
* find previous task list by work process id
* @param processInstanceId processInstanceId
* @return task instance list
*/
List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId);
/**
* find task instance by id
* @param taskId task id
* @return task instance
*/
TaskInstance findTaskInstanceById(Integer taskId);
/**
* find task instance list by id list
* @param idList task id list
* @return task instance list
*/
List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList);
}

52
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
import lombok.NonNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
/**
* Process Instance Map Dao implementation
*/
@Repository
public class ProcessInstanceMapDaoImpl implements ProcessInstanceMapDao {
@Autowired
private ProcessInstanceMapMapper processInstanceMapMapper;
@Override
public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
return processInstanceMapMapper.updateById(processInstanceMap);
}
@Override
public int createWorkProcessInstanceMap(@NonNull ProcessInstanceMap processInstanceMap) {
return processInstanceMapMapper.insert(processInstanceMap);
}
@Override
public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId);
}
}

87
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import com.google.common.collect.Lists;
/**
* Task Definition DAO Implementation
*/
@Repository
public class TaskDefinitionDaoImpl implements TaskDefinitionDao {
private final Logger logger = LoggerFactory.getLogger(TaskDefinitionDaoImpl.class);
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Override
public List<TaskDefinition> getTaskDefinitionListByDefinition(long processDefinitionCode) {
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
logger.error("Cannot find process definition, code: {}", processDefinitionCode);
return Lists.newArrayList();
}
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper
.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
processTaskRelations.stream().filter(p -> p.getPostTaskCode() > 0)
.forEach(p -> taskDefinitionSet.add(new TaskDefinition(p.getPostTaskCode(), p.getPostTaskVersion())));
if (taskDefinitionSet.isEmpty()) {
return Lists.newArrayList();
}
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
return Lists.newArrayList(taskDefinitionLogs);
}
@Override
public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) {
return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion);
}
}

91
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import com.google.common.collect.Lists;
/**
* Task Definition Log DAP implementation
*/
@Repository
public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao {
private final Logger logger = LoggerFactory.getLogger(TaskDefinitionLogDaoImpl.class);
@Autowired
private TaskDefinitionDao taskDefinitionDao;
@Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Override
public List<TaskDefinitionLog> getTaskDefineLogList(List<ProcessTaskRelation> processTaskRelations) {
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(),
processTaskRelation.getPreTaskVersion()));
}
if (processTaskRelation.getPostTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(),
processTaskRelation.getPostTaskVersion()));
}
}
if (taskDefinitionSet.isEmpty()) {
return Lists.newArrayList();
}
return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
}
@Override
public List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations) {
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
Map<Long, Integer> taskCodeVersionMap = new HashMap<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) {
taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion());
}
if (processTaskRelation.getPostTaskCode() > 0) {
taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
}
}
taskCodeVersionMap.forEach((code, version) -> {
taskDefinitionLogs.add((TaskDefinitionLog) taskDefinitionDao.findTaskDefinition(code, version));
});
return taskDefinitionLogs;
}
}

170
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
/**
* Task Instance DAO implementation
*/
@Repository
public class TaskInstanceDaoImpl implements TaskInstanceDao {
private final Logger logger = LoggerFactory.getLogger(TaskInstanceDaoImpl.class);
@Autowired
private TaskInstanceMapper taskInstanceMapper;
@Autowired
private ProcessInstanceMapper processInstanceMapper;
@Autowired
private ProcessInstanceMapDao processInstanceMapDao;
@Override
public boolean upsertTaskInstance(TaskInstance taskInstance) {
if (taskInstance.getId() != null) {
return updateTaskInstance(taskInstance);
} else {
return insertTaskInstance(taskInstance);
}
}
@Override
public boolean insertTaskInstance(TaskInstance taskInstance) {
int count = taskInstanceMapper.insert(taskInstance);
return count > 0;
}
@Override
public boolean updateTaskInstance(TaskInstance taskInstance) {
int count = taskInstanceMapper.updateById(taskInstance);
return count > 0;
}
@Override
public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
WorkflowExecutionStatus processInstanceState = processInstance.getState();
if (processInstanceState.isFinished() || processInstanceState == WorkflowExecutionStatus.READY_STOP) {
logger.warn("processInstance: {} state was: {}, skip submit this task, taskCode: {}",
processInstance.getId(),
processInstanceState,
taskInstance.getTaskCode());
return null;
}
if (processInstanceState == WorkflowExecutionStatus.READY_PAUSE) {
taskInstance.setState(TaskExecutionStatus.PAUSE);
}
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
if (taskInstance.getSubmitTime() == null) {
taskInstance.setSubmitTime(new Date());
}
if (taskInstance.getFirstSubmitTime() == null) {
taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime());
}
boolean saveResult = upsertTaskInstance(taskInstance);
if (!saveResult) {
return null;
}
return taskInstance;
}
private TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) {
TaskExecutionStatus state = taskInstance.getState();
if (state == TaskExecutionStatus.RUNNING_EXECUTION
|| state == TaskExecutionStatus.DELAY_EXECUTION
|| state == TaskExecutionStatus.KILL
|| state == TaskExecutionStatus.DISPATCH) {
return state;
}
if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
state = TaskExecutionStatus.PAUSE;
} else if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP
|| !checkProcessStrategy(taskInstance, processInstance)) {
state = TaskExecutionStatus.KILL;
} else {
state = TaskExecutionStatus.SUBMITTED_SUCCESS;
}
return state;
}
private boolean checkProcessStrategy(TaskInstance taskInstance, ProcessInstance processInstance) {
FailureStrategy failureStrategy = processInstance.getFailureStrategy();
if (failureStrategy == FailureStrategy.CONTINUE) {
return true;
}
List<TaskInstance> taskInstances =
this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), taskInstance.getTestFlag());
for (TaskInstance task : taskInstances) {
if (task.getState() == TaskExecutionStatus.FAILURE
&& task.getRetryTimes() >= task.getMaxRetryTimes()) {
return false;
}
}
return true;
}
@Override
public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag) {
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag);
}
@Override
public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId) {
ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO,
processInstance.getTestFlag());
}
@Override
public TaskInstance findTaskInstanceById(Integer taskId) {
return taskInstanceMapper.selectById(taskId);
}
@Override
public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
if (CollectionUtils.isEmpty(idList)) {
return new ArrayList<>();
}
return taskInstanceMapper.selectBatchIds(idList);
}
}

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
@ -36,7 +37,6 @@ import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
@ -75,11 +75,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
@Autowired
private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
/**
* processService
*/
@Autowired
private ProcessService processService;
private TaskInstanceDao taskInstanceDao;
/**
* executor dispatcher
@ -264,7 +261,7 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
* @return taskInstance is final state
*/
public boolean taskInstanceIsFinalState(int taskInstanceId) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId);
return taskInstance.getState().isFinished();
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@ -46,6 +47,9 @@ public class TaskDelayEventHandler implements TaskEventHandler {
@Autowired
private ProcessService processService;
@Autowired
private TaskInstanceDao taskInstanceDao;
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@ -85,7 +89,7 @@ public class TaskDelayEventHandler implements TaskEventHandler {
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
if (!processService.updateTaskInstance(taskInstance)) {
if (!taskInstanceDao.updateTaskInstance(taskInstance)) {
throw new TaskEventHandleError("Handle task delay event error, update taskInstance to db failed");
}
sendAckToWorker(taskEvent);

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java

@ -19,12 +19,12 @@ package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,7 +40,7 @@ public class TaskDispatchEventHandler implements TaskEventHandler {
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private ProcessService processService;
private TaskInstanceDao taskInstanceDao;
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
@ -68,7 +68,7 @@ public class TaskDispatchEventHandler implements TaskEventHandler {
taskInstance.setState(TaskExecutionStatus.DISPATCH);
taskInstance.setHost(taskEvent.getWorkerAddress());
try {
if (!processService.updateTaskInstance(taskInstance)) {
if (!taskInstanceDao.updateTaskInstance(taskInstance)) {
throw new TaskEventHandleError("Handle task dispatch event error, update taskInstance to db failed");
}
} catch (Exception ex) {

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@ -50,6 +51,9 @@ public class TaskResultEventHandler implements TaskEventHandler {
@Autowired
private ProcessService processService;
@Autowired
private TaskInstanceDao taskInstanceDao;
@Autowired
private MasterConfig masterConfig;
@ -92,7 +96,7 @@ public class TaskResultEventHandler implements TaskEventHandler {
taskInstance.setEndTime(taskEvent.getEndTime());
taskInstance.setVarPool(taskEvent.getVarPool());
processService.changeOutParam(taskInstance);
processService.updateTaskInstance(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
sendAckToWorker(taskEvent);
} catch (Exception ex) {
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java

@ -20,13 +20,13 @@ package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Optional;
@ -47,7 +47,7 @@ public class TaskRunningEventHandler implements TaskEventHandler {
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Autowired
private ProcessService processService;
private TaskInstanceDao taskInstanceDao;
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
@ -84,7 +84,7 @@ public class TaskRunningEventHandler implements TaskEventHandler {
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
if (!processService.updateTaskInstance(taskInstance)) {
if (!taskInstanceDao.updateTaskInstance(taskInstance)) {
throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed");
}
sendAckToWorker(taskEvent);

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java

@ -19,13 +19,13 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteStartCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.runner.StreamTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.StreamTaskExecuteThreadPool;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,7 +47,7 @@ public class TaskExecuteStartProcessor implements NettyRequestProcessor {
private StreamTaskExecuteThreadPool streamTaskExecuteThreadPool;
@Autowired
private ProcessService processService;
private TaskDefinitionDao taskDefinitionDao;
@Override
public void process(Channel channel, Command command) {
@ -57,7 +57,7 @@ public class TaskExecuteStartProcessor implements NettyRequestProcessor {
JSONUtils.parseObject(command.getBody(), TaskExecuteStartCommand.class);
logger.info("taskExecuteStartCommand: {}", taskExecuteStartCommand);
TaskDefinition taskDefinition = processService.findTaskDefinition(
TaskDefinition taskDefinition = taskDefinitionDao.findTaskDefinition(
taskExecuteStartCommand.getTaskDefinitionCode(), taskExecuteStartCommand.getTaskDefinitionVersion());
if (taskDefinition == null) {
logger.error("Task definition can not be found, taskDefinitionCode:{}, taskDefinitionVersion:{}",

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java

@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
@ -73,6 +75,12 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private TaskInstanceDao taskInstanceDao;
@Autowired
private TaskDefinitionLogDao taskDefinitionLogDao;
@Autowired
private MasterConfig masterConfig;
@ -183,7 +191,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
processAlertManager,
masterConfig,
stateWheelExecuteThread,
curingGlobalParamsService);
curingGlobalParamsService,
taskInstanceDao,
taskDefinitionLogDao);
processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -89,6 +90,8 @@ public class StreamTaskExecuteRunnable implements Runnable {
protected ProcessService processService;
protected TaskInstanceDao taskInstanceDao;
protected ExecutorDispatcher dispatcher;
protected ProcessTaskRelationMapper processTaskRelationMapper;
@ -118,6 +121,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
this.dispatcher = SpringApplicationContext.getBean(ExecutorDispatcher.class);
this.taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
this.processTaskRelationMapper = SpringApplicationContext.getBean(ProcessTaskRelationMapper.class);
this.taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class);
this.streamTaskInstanceExecCacheManager =
SpringApplicationContext.getBean(StreamTaskInstanceExecCacheManager.class);
this.taskDefinition = taskDefinition;
@ -133,7 +137,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
// submit task
processService.updateTaskDefinitionResources(taskDefinition);
taskInstance = newTaskInstance(taskDefinition);
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
// add cache
streamTaskInstanceExecCacheManager.cache(taskInstance.getId(), this);
@ -148,7 +152,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
TaskExecutionContext taskExecutionContext = getTaskExecutionContext(taskInstance);
if (taskExecutionContext == null) {
taskInstance.setState(TaskExecutionStatus.FAILURE);
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
return;
}
@ -175,7 +179,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
// set task instance fail
taskInstance.setState(TaskExecutionStatus.FAILURE);
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
return;
}
@ -416,7 +420,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
taskInstance.setEndTime(taskEvent.getEndTime());
taskInstance.setVarPool(taskEvent.getVarPool());
processService.changeOutParam(taskInstance);
processService.updateTaskInstance(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
// send ack
sendAckToWorker(taskEvent);

35
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -58,6 +58,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -134,6 +136,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private ProcessInstanceDao processInstanceDao;
private TaskInstanceDao taskInstanceDao;
private TaskDefinitionLogDao taskDefinitionLogDao;
private final ProcessAlertManager processAlertManager;
private final NettyExecutorManager nettyExecutorManager;
@ -245,7 +251,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
@NonNull ProcessAlertManager processAlertManager,
@NonNull MasterConfig masterConfig,
@NonNull StateWheelExecuteThread stateWheelExecuteThread,
@NonNull CuringParamsService curingParamsService) {
@NonNull CuringParamsService curingParamsService,
@NonNull TaskInstanceDao taskInstanceDao,
@NonNull TaskDefinitionLogDao taskDefinitionLogDao) {
this.processService = processService;
this.commandService = commandService;
this.processInstanceDao = processInstanceDao;
@ -254,6 +262,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread;
this.curingParamsService = curingParamsService;
this.taskInstanceDao = taskInstanceDao;
this.taskDefinitionLogDao = taskDefinitionLogDao;
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
}
@ -347,7 +357,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
public boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.DISPATCH);
this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(),
@ -357,7 +367,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue);
if (acquireTaskGroup) {
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.DISPATCH);
return true;
@ -502,7 +512,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
public void refreshTaskInstance(int taskInstanceId) {
logger.info("task instance update: {} ", taskInstanceId);
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId);
if (taskInstance == null) {
logger.error("can not find task instance, id:{}", taskInstanceId);
return;
@ -775,7 +785,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
List<ProcessTaskRelation> processTaskRelations =
processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
List<TaskDefinitionLog> taskDefinitionLogs =
processService.getTaskDefineLogListByRelation(processTaskRelations);
taskDefinitionLogDao.getTaskDefineLogListByRelation(processTaskRelations);
List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
forbiddenTaskMap.clear();
@ -815,7 +825,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
processInstance.getRunTimes(),
processInstance.getRecovery());
List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(),
processInstance.getTestFlag());
for (TaskInstance task : validTaskInstanceList) {
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId());
@ -831,7 +842,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
if (!oldTaskInstance.getState().isFinished() && task.getState().isFinished()) {
task.setFlag(Flag.NO);
processService.updateTaskInstance(task);
taskInstanceDao.updateTaskInstance(task);
continue;
}
}
@ -852,7 +863,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (task.getState().isNeedFaultTolerance()) {
logger.info("TaskInstance needs fault tolerance, will be added to standby list.");
task.setFlag(Flag.NO);
processService.updateTaskInstance(task);
taskInstanceDao.updateTaskInstance(task);
// tolerantTaskInstance add to standby list directly
TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
@ -953,7 +964,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (taskInstance.getId() != oldTaskInstanceId) {
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
oldTaskInstance.setFlag(Flag.NO);
processService.updateTaskInstance(oldTaskInstance);
taskInstanceDao.updateTaskInstance(oldTaskInstance);
validTaskMap.remove(taskInstance.getTaskCode());
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
}
@ -1780,7 +1791,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (taskInstanceId == null || taskInstanceId.equals(0)) {
continue;
}
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId);
if (taskInstance == null || taskInstance.getState().isFinished()) {
continue;
}
@ -1813,7 +1824,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
// stop tasks which is retrying if forced success happens
if (task.taskCanRetry()) {
TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
TaskInstance retryTask = taskInstanceDao.findTaskInstanceById(task.getId());
if (retryTask != null && retryTask.getState().isForceSuccess()) {
task.setState(retryTask.getState());
logger.info(
@ -1888,7 +1899,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
.map(Integer::valueOf)
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(startTaskInstanceIds)) {
return processService.findTaskInstanceByIdList(startTaskInstanceIds);
return taskInstanceDao.findTaskInstanceByIdList(startTaskInstanceIds);
}
}
return Collections.emptyList();

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -49,6 +49,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
@ -123,6 +124,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected ProcessService processService;
protected TaskInstanceDao taskInstanceDao;
protected ProcessInstanceDao processInstanceDao;
protected MasterConfig masterConfig;
@ -140,6 +143,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
curingParamsService = SpringApplicationContext.getBean(CuringParamsService.class);
taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class);
this.taskInstance = taskInstance;
this.processInstance = processInstance;
this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
@ -306,7 +310,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
if (verifyTenantIsNull(tenant, taskInstance)) {
logger.info("Task state changes to {}", TaskExecutionStatus.FAILURE);
taskInstance.setState(TaskExecutionStatus.FAILURE);
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java

@ -77,7 +77,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
this.taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
this.taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);
this.taskInstanceDao.upsertTaskInstance(taskInstance);
this.dependentParameters = taskInstance.getDependency();
this.blockingParam = JSONUtils.parseObject(taskInstance.getTaskParams(), BlockingParameters.class);
}
@ -87,7 +87,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
// todo: task cannot be pause
taskInstance.setState(TaskExecutionStatus.PAUSE);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
logger.info("blocking task has been paused");
return true;
}
@ -96,7 +96,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
protected boolean killTask() {
taskInstance.setState(TaskExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
logger.info("blocking task has been killed");
return true;
}
@ -171,7 +171,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
private void setConditionResult() {
List<TaskInstance> taskInstances = processService
List<TaskInstance> taskInstances = taskInstanceDao
.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), processInstance.getTestFlag());
for (TaskInstance task : taskInstances) {
completeTaskList.putIfAbsent(task.getTaskCode(), task.getState());
@ -204,7 +204,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
}
taskInstance.setState(TaskExecutionStatus.SUCCESS);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
logger.info("blocking task execute complete, blocking:{}", isBlocked);
}
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -146,7 +146,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
public boolean killTask() {
try {
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
taskInstance = taskInstanceDao.findTaskInstanceById(taskInstance.getId());
if (taskInstance == null) {
return true;
}
@ -156,7 +156,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
// we don't wait the kill response
taskInstance.setState(TaskExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
if (StringUtils.isNotEmpty(taskInstance.getHost())) {
killRemoteTask();
}

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java

@ -99,7 +99,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
protected boolean pauseTask() {
this.taskInstance.setState(TaskExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
return true;
}
@ -120,7 +120,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
protected boolean killTask() {
this.taskInstance.setState(TaskExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
return true;
}
@ -138,13 +138,13 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);
this.taskInstanceDao.upsertTaskInstance(taskInstance);
this.dependentParameters = taskInstance.getDependency();
}
private void setConditionResult() {
List<TaskInstance> taskInstances = processService
List<TaskInstance> taskInstances = taskInstanceDao
.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), processInstance.getTestFlag());
for (TaskInstance task : taskInstances) {
completeTaskList.putIfAbsent(task.getTaskCode(), task.getState());
@ -194,6 +194,6 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
(conditionResult == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
}
}

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java

@ -114,7 +114,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
initDependParameters();
logger.info("Success initialize dependent task parameters, the dependent data is: {}", dependentDate);
return true;
@ -231,7 +231,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
protected boolean pauseTask() {
this.taskInstance.setState(TaskExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
return true;
}
@ -239,7 +239,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
protected boolean killTask() {
this.taskInstance.setState(TaskExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
return true;
}
@ -290,7 +290,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
status = (result == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
}
@Override

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java

@ -133,7 +133,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
taskInstance.setState(TaskExecutionStatus.of(subProcessInstance.getState().getCode()));
taskInstance.setEndTime(new Date());
dealFinish();
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
}
}
@ -201,7 +201,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
logger.info("set sub work flow {} task {} state: {}",
processInstance.getId(),
taskInstance.getId(),

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

@ -80,7 +80,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
if (!this.taskInstance().getState().isFinished()) {
setSwitchResult();
@ -104,7 +104,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
protected boolean pauseTask() {
this.taskInstance.setState(TaskExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
return true;
}
@ -112,7 +112,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
protected boolean killTask() {
this.taskInstance.setState(TaskExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
return true;
}
@ -127,7 +127,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
}
private boolean setSwitchResult() {
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
List<TaskInstance> taskInstances = taskInstanceDao.findValidTaskListByProcessId(
taskInstance.getProcessInstanceId(), processInstance.getTestFlag());
Map<String, TaskExecutionStatus> completeTaskList = new HashMap<>();
for (TaskInstance task : taskInstances) {
@ -188,7 +188,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
(conditionResult == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
taskInstance.setEndTime(new Date());
taskInstance.setState(status);
processService.updateTaskInstance(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
}
public String setTaskParams(String content, String rgex) {

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
@ -75,12 +76,15 @@ public class MasterFailoverService {
private final LogClient logClient;
private final TaskInstanceDao taskInstanceDao;
public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull NettyExecutorManager nettyExecutorManager,
@NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager,
@NonNull LogClient logClient) {
@NonNull LogClient logClient,
@NonNull TaskInstanceDao taskInstanceDao) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
@ -88,7 +92,7 @@ public class MasterFailoverService {
this.localAddress = masterConfig.getMasterAddress();
this.processInstanceExecCacheManager = processInstanceExecCacheManager;
this.logClient = logClient;
this.taskInstanceDao = taskInstanceDao;
}
/**
@ -164,7 +168,7 @@ public class MasterFailoverService {
processInstance.setProcessDefinition(processDefinition);
int processInstanceId = processInstance.getId();
List<TaskInstance> taskInstanceList =
processService.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
for (TaskInstance taskInstance : taskInstanceList) {
try {
LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId());
@ -249,7 +253,7 @@ public class MasterFailoverService {
}
taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
}
private void sendKillCommandToWorker(@NonNull TaskInstance taskInstance) {

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder;
@ -73,12 +74,15 @@ public class WorkerFailoverService {
private final LogClient logClient;
private final String localAddress;
private final TaskInstanceDao taskInstanceDao;
public WorkerFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
@NonNull ProcessInstanceExecCacheManager cacheManager,
@NonNull LogClient logClient) {
@NonNull LogClient logClient,
@NonNull TaskInstanceDao taskInstanceDao) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
@ -86,6 +90,7 @@ public class WorkerFailoverService {
this.cacheManager = cacheManager;
this.logClient = logClient;
this.localAddress = masterConfig.getMasterAddress();
this.taskInstanceDao = taskInstanceDao;
}
/**
@ -183,7 +188,7 @@ public class WorkerFailoverService {
taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
taskInstance.setFlag(Flag.NO);
processService.saveTaskInstance(taskInstance);
taskInstanceDao.upsertTaskInstance(taskInstance);
TaskStateEvent stateEvent = TaskStateEvent.builder()
.processInstanceId(processInstance.getId())

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.utils;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -48,6 +49,8 @@ public class DependentExecute {
*/
private final ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
private final TaskInstanceDao taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class);
/**
* depend item list
*/
@ -154,7 +157,7 @@ public class DependentExecute {
DependResult result;
TaskInstance taskInstance = null;
List<TaskInstance> taskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId(), testFlag);
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), testFlag);
for (TaskInstance task : taskInstanceList) {
if (task.getTaskCode() == taskCode) {

24
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java

@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@ -62,6 +64,10 @@ public class BlockingTaskTest {
private ProcessService processService;
private TaskInstanceDao taskInstanceDao;
private TaskDefinitionDao taskDefinitionDao;
private ProcessInstance processInstance;
private MasterConfig config;
@ -80,9 +86,14 @@ public class BlockingTaskTest {
// mock process service
processService = Mockito.mock(ProcessService.class);
Mockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
Mockito.when(SpringApplicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
// mock process instance
processInstance = getProcessInstance();
Mockito.when(processService
@ -93,7 +104,7 @@ public class BlockingTaskTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
Mockito.when(processService.findTaskDefinition(1L, 1))
Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
}
@ -187,24 +198,23 @@ public class BlockingTaskTest {
.submitTask(processInstance, taskInstance))
.thenReturn(taskInstance);
Mockito.when(processService
Mockito.when(taskInstanceDao
.findTaskInstanceById(taskInstance.getId()))
.thenReturn(taskInstance);
// for BlockingTaskExecThread.initTaskParameters
Mockito.when(processService
.saveTaskInstance(taskInstance))
Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance))
.thenReturn(true);
// for BlockingTaskExecThread.updateTaskState
Mockito.when(processService
Mockito.when(taskInstanceDao
.updateTaskInstance(taskInstance))
.thenReturn(true);
// for BlockingTaskExecThread.waitTaskQuit
List<TaskInstance> conditions = getTaskInstanceForValidTaskList(expectResults);
Mockito.when(
processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
.thenReturn(conditions);
return taskInstance;
}

23
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@ -61,6 +63,10 @@ public class ConditionsTaskTest {
private ProcessInstance processInstance;
private TaskInstanceDao taskInstanceDao;
private TaskDefinitionDao taskDefinitionDao;
@BeforeEach
public void before() {
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
@ -75,6 +81,12 @@ public class ConditionsTaskTest {
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
processInstance = getProcessInstance();
Mockito.when(processService
.findProcessInstanceById(processInstance.getId()))
@ -84,7 +96,7 @@ public class ConditionsTaskTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
Mockito.when(processService.findTaskDefinition(1L, 1))
Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
}
@ -96,22 +108,21 @@ public class ConditionsTaskTest {
.submitTask(processInstance, taskInstance))
.thenReturn(taskInstance);
// for MasterBaseTaskExecThread.call
Mockito.when(processService
Mockito.when(taskInstanceDao
.findTaskInstanceById(taskInstance.getId()))
.thenReturn(taskInstance);
// for ConditionsTaskExecThread.initTaskParameters
Mockito.when(processService
.saveTaskInstance(taskInstance))
Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance))
.thenReturn(true);
// for ConditionsTaskExecThread.updateTaskState
Mockito.when(processService
Mockito.when(taskInstanceDao
.updateTaskInstance(taskInstance))
.thenReturn(true);
// for ConditionsTaskExecThread.waitTaskQuit
List<TaskInstance> conditions = Stream.of(
getTaskInstanceForValidTaskList(expectResult)).collect(Collectors.toList());
Mockito.when(processService
Mockito.when(taskInstanceDao
.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
.thenReturn(conditions);
return taskInstance;

31
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -70,6 +72,10 @@ public class DependentTaskTest {
private ProcessService processService;
private TaskInstanceDao taskInstanceDao;
private TaskDefinitionDao taskDefinitionDao;
/**
* the dependent task to be tested
* ProcessDefinition id=1
@ -95,6 +101,12 @@ public class DependentTaskTest {
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
processInstance = getProcessInstance();
taskInstance = getTaskInstance();
@ -110,20 +122,19 @@ public class DependentTaskTest {
.thenAnswer(i -> taskInstance);
// for DependentTaskExecThread.initTaskParameters
Mockito.when(processService
Mockito.when(taskInstanceDao
.updateTaskInstance(Mockito.any()))
.thenReturn(true);
// for DependentTaskExecThread.updateTaskState
Mockito.when(processService
.saveTaskInstance(Mockito.any()))
Mockito.when(taskInstanceDao.upsertTaskInstance(Mockito.any()))
.thenReturn(true);
// for DependentTaskExecThread.waitTaskQuit
Mockito.when(processService
Mockito.when(taskInstanceDao
.findTaskInstanceById(1000))
.thenAnswer(i -> taskInstance);
Mockito.when(processService.findTaskDefinition(TASK_CODE, TASK_VERSION))
Mockito.when(taskDefinitionDao.findTaskDefinition(TASK_CODE, TASK_VERSION))
.thenReturn(getTaskDefinition());
}
@ -155,7 +166,7 @@ public class DependentTaskTest {
.thenReturn(dependentProcessInstance);
// for DependentExecute.getDependTaskResult
Mockito.when(processService
Mockito.when(taskInstanceDao
.findValidTaskListByProcessId(200, 0))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.SUCCESS, DEPEND_TASK_CODE_A,
@ -177,7 +188,7 @@ public class DependentTaskTest {
.thenReturn(dependentProcessInstance);
// for DependentExecute.getDependTaskResult
Mockito.when(processService
Mockito.when(taskInstanceDao
.findValidTaskListByProcessId(200, 0))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.FAILURE, DEPEND_TASK_CODE_A,
@ -228,13 +239,13 @@ public class DependentTaskTest {
.thenReturn(processInstance300);
// for DependentExecute.getDependTaskResult
Mockito.when(processService
Mockito.when(taskInstanceDao
.findValidTaskListByProcessId(200, 0))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.FAILURE, DEPEND_TASK_CODE_A,
processInstance200))
.collect(Collectors.toList()));
Mockito.when(processService
Mockito.when(taskInstanceDao
.findValidTaskListByProcessId(300, 0))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(3000, TaskExecutionStatus.SUCCESS, DEPEND_TASK_CODE_B,
@ -321,7 +332,7 @@ public class DependentTaskTest {
// DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
// for DependentExecute.getDependTaskResult
Mockito.when(processService
Mockito.when(taskInstanceDao
.findValidTaskListByProcessId(200, 0))
.thenAnswer(i -> {
processInstance.setState(WorkflowExecutionStatus.READY_STOP);

8
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@ -55,6 +56,8 @@ public class SubProcessTaskTest {
private ProcessService processService;
private TaskInstanceDao taskInstanceDao;
private ProcessInstance processInstance;
private MockedStatic<ServerLifeCycleManager> mockedStaticServerLifeCycleManager;
@ -72,11 +75,14 @@ public class SubProcessTaskTest {
processService = Mockito.mock(ProcessService.class);
Mockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
Mockito.when(SpringApplicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
mockedStaticServerLifeCycleManager = Mockito.mockStatic(ServerLifeCycleManager.class);
Mockito.when(ServerLifeCycleManager.isStopped()).thenReturn(false);
processInstance = getProcessInstance();
Mockito.when(processService
Mockito.when(taskInstanceDao
.updateTaskInstance(Mockito.any()))
.thenReturn(true);

21
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java

@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
@ -53,6 +55,10 @@ public class SwitchTaskTest {
private ProcessInstance processInstance;
private TaskInstanceDao taskInstanceDao;
private TaskDefinitionDao taskDefinitionDao;
@BeforeEach
public void before() {
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
@ -67,6 +73,12 @@ public class SwitchTaskTest {
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
processInstance = getProcessInstance();
Mockito.when(processService
.findProcessInstanceById(processInstance.getId()))
@ -78,7 +90,7 @@ public class SwitchTaskTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
Mockito.when(processService.findTaskDefinition(1L, 1))
Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
TaskInstance taskInstance = getTaskInstance(getTaskNode(), processInstance);
@ -87,15 +99,14 @@ public class SwitchTaskTest {
.submitTask(processInstance, taskInstance))
.thenReturn(taskInstance);
// for MasterBaseTaskExecThread.call
Mockito.when(processService
Mockito.when(taskInstanceDao
.findTaskInstanceById(taskInstance.getId()))
.thenReturn(taskInstance);
// for SwitchTaskExecThread.initTaskParameters
Mockito.when(processService
.saveTaskInstance(taskInstance))
Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance))
.thenReturn(true);
// for SwitchTaskExecThread.updateTaskState
Mockito.when(processService
Mockito.when(taskInstanceDao
.updateTaskInstance(taskInstance))
.thenReturn(true);

12
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -60,6 +61,9 @@ public class TaskPriorityQueueConsumerTest {
@Autowired
private ProcessService processService;
@Autowired
private TaskInstanceDao taskInstanceDao;
@Autowired
private ExecutorDispatcher dispatcher;
@ -236,7 +240,7 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
Boolean state = taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
Assertions.assertNotNull(state);
@ -264,7 +268,7 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup");
taskPriorityQueue.put(taskPriority);
@ -302,7 +306,7 @@ public class TaskPriorityQueueConsumerTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskInstance.setTaskDefine(taskDefinition);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority();
taskPriority.setTaskId(1);
@ -333,7 +337,7 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup");
taskPriorityQueue.put(taskPriority);

12
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java

@ -31,6 +31,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
@ -71,6 +73,9 @@ public class WorkflowExecuteRunnableTest {
private ProcessInstance processInstance;
private TaskInstanceDao taskInstanceDao;
private TaskDefinitionLogDao taskDefinitionLogDao;
private ProcessService processService;
private CommandService commandService;
@ -96,6 +101,8 @@ public class WorkflowExecuteRunnableTest {
commandService = Mockito.mock(CommandService.class);
processInstanceDao = Mockito.mock(ProcessInstanceDao.class);
processInstance = Mockito.mock(ProcessInstance.class);
taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
taskDefinitionLogDao = Mockito.mock(TaskDefinitionLogDao.class);
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00");
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, "2020-01-20 23:00:00");
@ -111,7 +118,8 @@ public class WorkflowExecuteRunnableTest {
workflowExecuteThread = Mockito.spy(
new WorkflowExecuteRunnable(processInstance, commandService, processService, processInstanceDao,
nettyExecutorManager,
processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService));
processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService,
taskInstanceDao, taskDefinitionLogDao));
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true);
dag.set(workflowExecuteThread, new DAG());
@ -146,7 +154,7 @@ public class WorkflowExecuteRunnableTest {
taskInstance4.setId(4);
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4");
Mockito.when(processService.findTaskInstanceByIdList(
Mockito.when(taskInstanceDao.findTaskInstanceByIdList(
Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(),
taskInstance4.getId())))
.thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));

16
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java

@ -28,6 +28,8 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@ -56,6 +58,10 @@ public class CommonTaskProcessorTest {
private ProcessService processService;
private TaskInstanceDao taskInstanceDao;
private TaskDefinitionDao taskDefinitionDao;
private CommonTaskProcessor commonTaskProcessor;
@BeforeEach
@ -69,6 +75,12 @@ public class CommonTaskProcessorTest {
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
commonTaskProcessor = Mockito.mock(CommonTaskProcessor.class);
Mockito.when(applicationContext.getBean(CommonTaskProcessor.class)).thenReturn(commonTaskProcessor);
@ -76,7 +88,7 @@ public class CommonTaskProcessorTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
Mockito.when(processService.findTaskDefinition(1L, 1))
Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
}
@ -107,7 +119,7 @@ public class CommonTaskProcessorTest {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskInstance.setTaskDefine(taskDefinition);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
TaskExecutionContext taskExecutionContext = commonTaskProcessor.getTaskExecutionContext(taskInstance);
Assertions.assertNull(taskExecutionContext);
}

11
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@ -73,6 +74,9 @@ public class FailoverServiceTest {
@Mock
private ProcessService processService;
@Mock
private TaskInstanceDao taskInstanceDao;
@Mock
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@ -108,13 +112,14 @@ public class FailoverServiceTest {
given(masterConfig.getMasterAddress()).willReturn(testMasterHost);
MasterFailoverService masterFailoverService =
new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager,
processInstanceExecCacheManager, logClient);
processInstanceExecCacheManager, logClient, taskInstanceDao);
WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
masterConfig,
processService,
workflowExecuteThreadPool,
cacheManager,
logClient);
logClient,
taskInstanceDao);
failoverService = new FailoverService(masterFailoverService, workerFailoverService);
@ -150,7 +155,7 @@ public class FailoverServiceTest {
given(processService.queryNeedFailoverProcessInstances(Mockito.anyString()))
.willReturn(Arrays.asList(processInstance));
doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
given(processService.findValidTaskListByProcessId(Mockito.anyInt(), Mockito.anyInt()))
given(taskInstanceDao.findValidTaskListByProcessId(Mockito.anyInt(), Mockito.anyInt()))
.willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance));
Thread.sleep(1000);

35
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
@ -70,8 +69,6 @@ public interface ProcessService {
Optional<ProcessInstance> findProcessInstanceDetailById(int processId);
List<TaskDefinition> getTaskNodeListByDefinition(long defineCode);
ProcessInstance findProcessInstanceById(int processId);
ProcessDefinition findProcessDefineById(int processDefinitionId);
@ -104,36 +101,12 @@ public interface ProcessService {
void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task);
TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance);
TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance);
boolean saveTaskInstance(TaskInstance taskInstance);
boolean createTaskInstance(TaskInstance taskInstance);
boolean updateTaskInstance(TaskInstance taskInstance);
TaskInstance findTaskInstanceById(Integer taskId);
List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList);
void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance);
void updateTaskDefinitionResources(TaskDefinition taskDefinition);
List<Integer> findTaskIdByInstanceState(int instanceId, TaskExecutionStatus state);
List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag);
List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId);
int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId);
int deleteWorkProcessMapByParentId(int parentWorkProcessId);
ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId);
@ -157,8 +130,6 @@ public interface ProcessService {
@Transactional
void processNeedFailoverProcessInstances(ProcessInstance processInstance);
List<TaskInstance> queryNeedFailoverTaskInstances(String host);
DataSource findDataSourceById(int id);
ProcessInstance findProcessInstanceByTaskId(int taskId);
@ -216,12 +187,6 @@ public interface ProcessService {
DagData genDagData(ProcessDefinition processDefinition);
List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation> processTaskRelations);
List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations);
TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion);
List<ProcessTaskRelation> findRelationByCode(long processDefinitionCode, int processDefinitionVersion);
List<TaskNode> transformTask(List<ProcessTaskRelation> taskRelationList,

378
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
@ -110,6 +109,10 @@ import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -193,6 +196,18 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private TaskDefinitionDao taskDefinitionDao;
@Autowired
private TaskInstanceDao taskInstanceDao;
@Autowired
private TaskDefinitionLogDao taskDefinitionLogDao;
@Autowired
private ProcessInstanceMapDao processInstanceMapDao;
@Autowired
private DataSourceMapper dataSourceMapper;
@ -399,32 +414,6 @@ public class ProcessServiceImpl implements ProcessService {
return Optional.ofNullable(processInstanceMapper.queryDetailById(processId));
}
/**
* get task node list by definitionId
*/
@Override
public List<TaskDefinition> getTaskNodeListByDefinition(long defineCode) {
ProcessDefinition processDefinition = processDefineMapper.queryByCode(defineCode);
if (processDefinition == null) {
logger.error("process define not exists");
return Lists.newArrayList();
}
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper
.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPostTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(),
processTaskRelation.getPostTaskVersion()));
}
}
if (taskDefinitionSet.isEmpty()) {
return Lists.newArrayList();
}
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
return Lists.newArrayList(taskDefinitionLogs);
}
/**
* find process instance by id
*
@ -516,7 +505,7 @@ public class ProcessServiceImpl implements ProcessService {
public void removeTaskLogFile(Integer processInstanceId) {
ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
List<TaskInstance> taskInstanceList =
findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
@ -538,7 +527,7 @@ public class ProcessServiceImpl implements ProcessService {
public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) {
ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
List<TaskInstance> taskInstanceList =
findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
@ -560,7 +549,7 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
public void recurseFindSubProcess(long parentCode, List<Long> ids) {
List<TaskDefinition> taskNodeList = this.getTaskNodeListByDefinition(parentCode);
List<TaskDefinition> taskNodeList = taskDefinitionDao.getTaskDefinitionListByDefinition(parentCode);
if (taskNodeList != null && !taskNodeList.isEmpty()) {
@ -862,7 +851,7 @@ public class ProcessServiceImpl implements ProcessService {
failedList.addAll(killedList);
failedList.addAll(toleranceList);
for (Integer taskId : failedList) {
initTaskInstance(this.findTaskInstanceById(taskId));
initTaskInstance(taskInstanceDao.findTaskInstanceById(taskId));
}
cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA, convertIntListToString(failedList)));
@ -880,7 +869,7 @@ public class ProcessServiceImpl implements ProcessService {
TaskExecutionStatus.KILL);
for (Integer taskId : stopNodeList) {
// initialize the pause state
initTaskInstance(this.findTaskInstanceById(taskId));
initTaskInstance(taskInstanceDao.findTaskInstanceById(taskId));
}
cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA, convertIntListToString(stopNodeList)));
@ -897,10 +886,11 @@ public class ProcessServiceImpl implements ProcessService {
// delete all the valid tasks when complement data if id is not null
if (processInstance.getId() != null) {
List<TaskInstance> taskInstanceList =
this.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(),
processInstance.getTestFlag());
for (TaskInstance taskInstance : taskInstanceList) {
taskInstance.setFlag(Flag.NO);
this.updateTaskInstance(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
}
}
break;
@ -912,10 +902,11 @@ public class ProcessServiceImpl implements ProcessService {
}
// delete all the valid tasks when repeat running
List<TaskInstance> validTaskList =
findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(),
processInstance.getTestFlag());
for (TaskInstance taskInstance : validTaskList) {
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
}
processInstance.setStartTime(new Date());
processInstance.setRestartTime(processInstance.getStartTime());
@ -1067,7 +1058,7 @@ public class ProcessServiceImpl implements ProcessService {
// update sub process id to process map table
processInstanceMap.setProcessInstanceId(subProcessInstance.getId());
this.updateWorkProcessInstanceMap(processInstanceMap);
processInstanceMapDao.updateWorkProcessInstanceMap(processInstanceMap);
}
/**
@ -1129,11 +1120,11 @@ public class ProcessServiceImpl implements ProcessService {
if (!taskInstance.isSubProcess()
&& (taskInstance.getState().isKill() || taskInstance.getState().isFailure())) {
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
return;
}
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
updateTaskInstance(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
}
/**
@ -1183,7 +1174,7 @@ public class ProcessServiceImpl implements ProcessService {
taskInstance.getProcessInstanceId(),
processInstance.getState());
// submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
TaskInstance task = taskInstanceDao.submitTaskInstanceToDB(taskInstance, processInstance);
if (task == null) {
logger.error("Save taskInstance to db error, task name:{}, process id:{} state: {} ",
taskInstance.getName(),
@ -1227,7 +1218,7 @@ public class ProcessServiceImpl implements ProcessService {
processMap = findPreviousTaskProcessMap(parentInstance, parentTask);
if (processMap != null) {
processMap.setParentTaskInstanceId(parentTask.getId());
updateWorkProcessInstanceMap(processMap);
processInstanceMapDao.updateWorkProcessInstanceMap(processMap);
return processMap;
}
}
@ -1235,7 +1226,7 @@ public class ProcessServiceImpl implements ProcessService {
processMap = new ProcessInstanceMap();
processMap.setParentProcessInstanceId(parentInstance.getId());
processMap.setParentTaskInstanceId(parentTask.getId());
createWorkProcessInstanceMap(processMap);
processInstanceMapDao.createWorkProcessInstanceMap(processMap);
return processMap;
}
@ -1250,11 +1241,13 @@ public class ProcessServiceImpl implements ProcessService {
TaskInstance parentTask) {
Integer preTaskId = 0;
List<TaskInstance> preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId());
List<TaskInstance> preTaskList =
taskInstanceDao.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId());
for (TaskInstance task : preTaskList) {
if (task.getName().equals(parentTask.getName())) {
preTaskId = task.getId();
ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
ProcessInstanceMap map =
processInstanceMapDao.findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
if (map != null) {
return map;
}
@ -1277,7 +1270,8 @@ public class ProcessServiceImpl implements ProcessService {
return;
}
// check create sub work flow firstly
ProcessInstanceMap instanceMap = findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId());
ProcessInstanceMap instanceMap =
processInstanceMapDao.findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId());
if (null != instanceMap
&& CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) {
// recover failover tolerance would not create a new command when the sub command already have been created
@ -1347,165 +1341,6 @@ public class ProcessServiceImpl implements ProcessService {
}
}
/**
* submit task to mysql
*
* @param taskInstance taskInstance
* @param processInstance processInstance
* @return task instance
*/
@Override
public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
WorkflowExecutionStatus processInstanceState = processInstance.getState();
if (processInstanceState.isFinished() || processInstanceState == WorkflowExecutionStatus.READY_STOP) {
logger.warn("processInstance: {} state was: {}, skip submit this task, taskCode: {}",
processInstance.getId(),
processInstanceState,
taskInstance.getTaskCode());
return null;
}
if (processInstanceState == WorkflowExecutionStatus.READY_PAUSE) {
taskInstance.setState(TaskExecutionStatus.PAUSE);
}
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
if (taskInstance.getSubmitTime() == null) {
taskInstance.setSubmitTime(new Date());
}
if (taskInstance.getFirstSubmitTime() == null) {
taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime());
}
boolean saveResult = saveTaskInstance(taskInstance);
if (!saveResult) {
return null;
}
return taskInstance;
}
/**
* get submit task instance state by the work process state
* cannot modify the task state when running/kill/submit success, or this
* task instance is already exists in task queue .
* return pause if work process state is ready pause
* return stop if work process state is ready stop
* if all of above are not satisfied, return submit success
*
* @param taskInstance taskInstance
* @param processInstance processInstance
* @return process instance state
*/
@Override
public TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) {
TaskExecutionStatus state = taskInstance.getState();
// running, delayed or killed
// the task already exists in task queue
// return state
if (state == TaskExecutionStatus.RUNNING_EXECUTION
|| state == TaskExecutionStatus.DELAY_EXECUTION
|| state == TaskExecutionStatus.KILL
|| state == TaskExecutionStatus.DISPATCH) {
return state;
}
// return pasue /stop if process instance state is ready pause / stop
// or return submit success
if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
state = TaskExecutionStatus.PAUSE;
} else if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP
|| !checkProcessStrategy(taskInstance, processInstance)) {
state = TaskExecutionStatus.KILL;
} else {
state = TaskExecutionStatus.SUBMITTED_SUCCESS;
}
return state;
}
/**
* check process instance strategy
*
* @param taskInstance taskInstance
* @return check strategy result
*/
private boolean checkProcessStrategy(TaskInstance taskInstance, ProcessInstance processInstance) {
FailureStrategy failureStrategy = processInstance.getFailureStrategy();
if (failureStrategy == FailureStrategy.CONTINUE) {
return true;
}
List<TaskInstance> taskInstances =
this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), taskInstance.getTestFlag());
for (TaskInstance task : taskInstances) {
if (task.getState() == TaskExecutionStatus.FAILURE
&& task.getRetryTimes() >= task.getMaxRetryTimes()) {
return false;
}
}
return true;
}
/**
* insert or update task instance
*
* @param taskInstance taskInstance
* @return save task instance result
*/
@Override
public boolean saveTaskInstance(TaskInstance taskInstance) {
if (taskInstance.getId() != null) {
return updateTaskInstance(taskInstance);
} else {
return createTaskInstance(taskInstance);
}
}
/**
* insert task instance
*
* @param taskInstance taskInstance
* @return create task instance result
*/
@Override
public boolean createTaskInstance(TaskInstance taskInstance) {
int count = taskInstanceMapper.insert(taskInstance);
return count > 0;
}
/**
* update task instance
*
* @param taskInstance taskInstance
* @return update task instance result
*/
@Override
public boolean updateTaskInstance(TaskInstance taskInstance) {
int count = taskInstanceMapper.updateById(taskInstance);
return count > 0;
}
/**
* find task instance by id
*
* @param taskId task id
* @return task instance
*/
@Override
public TaskInstance findTaskInstanceById(Integer taskId) {
return taskInstanceMapper.selectById(taskId);
}
/**
* find task instance list by id list
*
* @param idList task id list
* @return task instance list
*/
@Override
public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
if (CollectionUtils.isEmpty(idList)) {
return new ArrayList<>();
}
return taskInstanceMapper.selectBatchIds(idList);
}
/**
* package task instance
*/
@ -1514,7 +1349,7 @@ public class ProcessServiceImpl implements ProcessService {
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processInstance.getProcessDefinition());
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
TaskDefinition taskDefinition = this.findTaskDefinition(
TaskDefinition taskDefinition = taskDefinitionDao.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
this.updateTaskDefinitionResources(taskDefinition);
@ -1603,69 +1438,6 @@ public class ProcessServiceImpl implements ProcessService {
return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.getCode());
}
/**
* find valid task list by process definition id
*
* @param processInstanceId processInstanceId
* @param testFlag testFlag
* @return task instance list
*/
@Override
public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag) {
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag);
}
/**
* find previous task list by work process id
*
* @param processInstanceId processInstanceId
* @return task instance list
*/
@Override
public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId) {
ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO,
processInstance.getTestFlag());
}
/**
* update work process instance map
*
* @param processInstanceMap processInstanceMap
* @return update process instance result
*/
@Override
public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
return processInstanceMapMapper.updateById(processInstanceMap);
}
/**
* create work process instance map
*
* @param processInstanceMap processInstanceMap
* @return create process instance result
*/
@Override
public int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
int count = 0;
if (processInstanceMap != null) {
return processInstanceMapMapper.insert(processInstanceMap);
}
return count;
}
/**
* find work process map by parent process id and parent task id.
*
* @param parentWorkProcessId parentWorkProcessId
* @param parentTaskId parentTaskId
* @return process instance map
*/
@Override
public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId);
}
/**
* delete work process map by parent process id
*
@ -1861,18 +1633,6 @@ public class ProcessServiceImpl implements ProcessService {
commandService.createCommand(cmd);
}
/**
* query all need failover task instances by host
*
* @param host host
* @return task instance list
*/
@Override
public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
return taskInstanceMapper.queryByHostAndStatus(host,
TaskExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
/**
* find data source by id
*
@ -2252,8 +2012,8 @@ public class ProcessServiceImpl implements ProcessService {
if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceIds = params.getResourceFilesList().stream()
.filter(t -> t.getId() != null)
.map(ResourceInfo::getId)
.filter(Objects::nonNull)
.collect(toSet());
}
if (CollectionUtils.isEmpty(resourceIds)) {
@ -2477,57 +2237,12 @@ public class ProcessServiceImpl implements ProcessService {
public DagData genDagData(ProcessDefinition processDefinition) {
List<ProcessTaskRelation> taskRelations =
this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
List<TaskDefinitionLog> taskDefinitionLogList = genTaskDefineList(taskRelations);
List<TaskDefinitionLog> taskDefinitionLogList = taskDefinitionLogDao.getTaskDefineLogList(taskRelations);
List<TaskDefinition> taskDefinitions =
taskDefinitionLogList.stream().map(t -> (TaskDefinition) t).collect(Collectors.toList());
return new DagData(processDefinition, taskRelations, taskDefinitions);
}
@Override
public List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation> processTaskRelations) {
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(),
processTaskRelation.getPreTaskVersion()));
}
if (processTaskRelation.getPostTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(),
processTaskRelation.getPostTaskVersion()));
}
}
if (taskDefinitionSet.isEmpty()) {
return Lists.newArrayList();
}
return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
}
@Override
public List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations) {
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
Map<Long, Integer> taskCodeVersionMap = new HashMap<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) {
taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion());
}
if (processTaskRelation.getPostTaskCode() > 0) {
taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
}
}
taskCodeVersionMap.forEach((code, version) -> {
taskDefinitionLogs.add((TaskDefinitionLog) this.findTaskDefinition(code, version));
});
return taskDefinitionLogs;
}
/**
* find task definition by code and version
*/
@Override
public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) {
return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion);
}
/**
* find process task relation list by process
*/
@ -2571,7 +2286,7 @@ public class ProcessServiceImpl implements ProcessService {
});
}
if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
taskDefinitionLogs = genTaskDefineList(taskRelationList);
taskDefinitionLogs = taskDefinitionLogDao.getTaskDefineLogList(taskRelationList);
}
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
@ -2626,7 +2341,7 @@ public class ProcessServiceImpl implements ProcessService {
return processTaskMap;
}
ProcessInstance fatherProcess = this.findProcessInstanceById(processInstanceMap.getParentProcessInstanceId());
TaskInstance fatherTask = this.findTaskInstanceById(processInstanceMap.getParentTaskInstanceId());
TaskInstance fatherTask = taskInstanceDao.findTaskInstanceById(processInstanceMap.getParentTaskInstanceId());
if (fatherProcess != null) {
processTaskMap.put(fatherProcess, fatherTask);
@ -2936,12 +2651,13 @@ public class ProcessServiceImpl implements ProcessService {
if (processInstance != null
&& (processInstance.getState().isFailure() || processInstance.getState().isStop())) {
List<TaskInstance> validTaskList =
findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(),
processInstance.getTestFlag());
List<Long> instanceTaskCodeList =
validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList());
List<ProcessTaskRelation> taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
List<TaskDefinitionLog> taskDefinitionLogs = genTaskDefineList(taskRelations);
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.getTaskDefineLogList(taskRelations);
List<Long> definiteTaskCodeList =
taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES)
.map(TaskDefinitionLog::getCode).collect(Collectors.toList());

32
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -66,6 +66,9 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType;
@ -121,6 +124,16 @@ public class ProcessServiceTest {
private ProcessInstanceMapper processInstanceMapper;
@Mock
private ProcessInstanceDao processInstanceDao;
@Mock
private TaskInstanceDao taskInstanceDao;
@Mock
private TaskDefinitionLogDao taskDefinitionLogDao;
@Mock
private TaskDefinitionDao taskDefinitionDao;
@Mock
private UserMapper userMapper;
@Mock
@ -652,7 +665,7 @@ public class ProcessServiceTest {
taskDefinitionLogs.add(taskDefinition);
taskDefinitionLogs.add(td2);
Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(any())).thenReturn(taskDefinitionLogs);
Mockito.when(taskDefinitionLogDao.getTaskDefineLogList(any())).thenReturn(taskDefinitionLogs);
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(list);
@ -729,23 +742,6 @@ public class ProcessServiceTest {
}
@Test
public void testFindTaskInstanceByIdList() {
List<Integer> emptyList = new ArrayList<>();
Mockito.when(taskInstanceMapper.selectBatchIds(emptyList)).thenReturn(new ArrayList<>());
Assertions.assertEquals(0, processService.findTaskInstanceByIdList(emptyList).size());
List<Integer> idList = Collections.singletonList(1);
TaskInstance instance = new TaskInstance();
instance.setId(1);
Mockito.when(taskInstanceMapper.selectBatchIds(idList)).thenReturn(Collections.singletonList(instance));
List<TaskInstance> taskInstanceByIdList = processService.findTaskInstanceByIdList(idList);
Assertions.assertEquals(1, taskInstanceByIdList.size());
Assertions.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId());
}
@Test
public void testFindLastManualProcessInterval() {
long definitionCode = 1L;

Loading…
Cancel
Save