Browse Source

Add baseDao (#14316)

3.2.1-prepare
Wenjun Ruan 1 year ago committed by GitHub
parent
commit
aa2d9a7f05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
  3. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  4. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
  5. 18
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  6. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  7. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionLogServiceImpl.java
  8. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  9. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
  10. 12
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
  11. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  12. 11
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  13. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
  14. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
  15. 91
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java
  16. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/DqExecuteResultDao.java
  17. 69
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java
  18. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
  19. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
  20. 35
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
  21. 18
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java
  22. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
  23. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProjectDao.java
  24. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java
  25. 17
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java
  26. 41
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
  27. 16
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/DqExecuteResultDaoImpl.java
  28. 56
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
  29. 21
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
  30. 91
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
  31. 27
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java
  32. 26
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
  33. 13
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProjectDaoImpl.java
  34. 30
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java
  35. 75
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java
  36. 71
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
  37. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandler.java
  38. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
  39. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
  40. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
  41. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
  42. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
  43. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
  44. 36
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  45. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskKillOperator.java
  46. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
  47. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/blocking/BlockingLogicTask.java
  48. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java
  49. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java
  50. 13
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
  51. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowAsyncTaskExecuteFunction.java
  52. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
  53. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
  54. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandlerTest.java
  55. 8
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
  56. 6
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskTest.java
  57. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
  58. 55
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  59. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java
  60. 2
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java

@ -53,7 +53,7 @@ public class PauseExecuteFunction implements ExecuteFunction<PauseExecuteRequest
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_PAUSE, workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_PAUSE,
CommandType.PAUSE.getDescp() + " by " + request.getExecuteUser().getUserName()); CommandType.PAUSE.getDescp() + " by " + request.getExecuteUser().getUserName());
if (processInstanceDao.updateProcessInstance(workflowInstance) <= 0) { if (!processInstanceDao.updateById(workflowInstance)) {
throw new ExecuteRuntimeException( throw new ExecuteRuntimeException(
String.format( String.format(
"The workflow instance: %s pause failed, due to update the workflow instance status in DB failed", "The workflow instance: %s pause failed, due to update the workflow instance status in DB failed",

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java

@ -57,7 +57,7 @@ public class StopExecuteFunction implements ExecuteFunction<StopRequest, StopRes
workflowInstance.setCommandType(CommandType.STOP); workflowInstance.setCommandType(CommandType.STOP);
workflowInstance.addHistoryCmd(CommandType.STOP); workflowInstance.addHistoryCmd(CommandType.STOP);
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, CommandType.STOP.getDescp() + " by user"); workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, CommandType.STOP.getDescp() + " by user");
if (processInstanceDao.updateProcessInstance(workflowInstance) > 0) { if (processInstanceDao.updateById(workflowInstance)) {
log.info("Workflow instance {} ready to stop success, will call master to stop the workflow instance", log.info("Workflow instance {} ready to stop success, will call master to stop the workflow instance",
workflowInstance.getName()); workflowInstance.getName());
// todo: Use specific stop command instead of WorkflowStateEventChangeCommand // todo: Use specific stop command instead of WorkflowStateEventChangeCommand

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -108,7 +108,6 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -406,8 +405,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
ApiFuncIdentificationConstant.map.get(executeType)); ApiFuncIdentificationConstant.map.get(executeType));
checkMasterExists(); checkMasterExists();
ProcessInstance workflowInstance = ProcessInstance workflowInstance = processInstanceDao.queryOptionalById(processInstanceId)
Optional.ofNullable(processInstanceDao.queryByWorkflowInstanceId(processInstanceId))
.orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId)); .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
checkState(workflowInstance.getProjectCode() == projectCode, checkState(workflowInstance.getProjectCode() == projectCode,
@ -637,10 +635,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
processInstance.setCommandType(commandType); processInstance.setCommandType(commandType);
processInstance.addHistoryCmd(commandType); processInstance.addHistoryCmd(commandType);
processInstance.setStateWithDesc(executionStatus, commandType.getDescp() + "by ui"); processInstance.setStateWithDesc(executionStatus, commandType.getDescp() + "by ui");
int update = processInstanceDao.updateProcessInstance(processInstance); boolean update = processInstanceDao.updateById(processInstance);
// determine whether the process is normal // determine whether the process is normal
if (update > 0) { if (update) {
log.info("Process instance state is updated to {} in database, processInstanceName:{}.", log.info("Process instance state is updated to {} in database, processInstanceName:{}.",
executionStatus.getDesc(), processInstance.getName()); executionStatus.getDesc(), processInstance.getName());
// directly send the process instance state change event to target master, not guarantee the event send // directly send the process instance state change event to target master, not guarantee the event send

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java

@ -92,7 +92,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Result<ResponseTaskLog> queryLog(User loginUser, int taskInstId, int skipLineNum, int limit) { public Result<ResponseTaskLog> queryLog(User loginUser, int taskInstId, int skipLineNum, int limit) {
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstId); TaskInstance taskInstance = taskInstanceDao.queryById(taskInstId);
if (taskInstance == null) { if (taskInstance == null) {
log.error("Task instance does not exist, taskInstanceId:{}.", taskInstId); log.error("Task instance does not exist, taskInstanceId:{}.", taskInstId);
@ -120,7 +120,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
*/ */
@Override @Override
public byte[] getLogBytes(User loginUser, int taskInstId) { public byte[] getLogBytes(User loginUser, int taskInstId) {
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstId); TaskInstance taskInstance = taskInstanceDao.queryById(taskInstId);
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
throw new ServiceException("task instance is null or host is null"); throw new ServiceException("task instance is null or host is null");
} }
@ -149,7 +149,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
return result; return result;
} }
// check whether the task instance can be found // check whether the task instance can be found
TaskInstance task = taskInstanceDao.findTaskInstanceById(taskInstId); TaskInstance task = taskInstanceDao.queryById(taskInstId);
if (task == null || StringUtils.isBlank(task.getHost())) { if (task == null || StringUtils.isBlank(task.getHost())) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND); putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result; return result;
@ -182,7 +182,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
throw new ServiceException("user has no permission"); throw new ServiceException("user has no permission");
} }
// check whether the task instance can be found // check whether the task instance can be found
TaskInstance task = taskInstanceDao.findTaskInstanceById(taskInstId); TaskInstance task = taskInstanceDao.queryById(taskInstId);
if (task == null || StringUtils.isBlank(task.getHost())) { if (task == null || StringUtils.isBlank(task.getHost())) {
throw new ServiceException("task instance is null or host is null"); throw new ServiceException("task instance is null or host is null");
} }

18
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -697,7 +697,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int workflowDefinitionVersion) { int workflowDefinitionVersion) {
ProcessDefinition workflowDefinition = processDefinitionDao.queryByCode(workflowDefinitionCode).orElse(null); ProcessDefinition workflowDefinition = processDefinitionDao.queryByCode(workflowDefinitionCode).orElse(null);
if (workflowDefinition == null || workflowDefinition.getVersion() != workflowDefinitionVersion) { if (workflowDefinition == null || workflowDefinition.getVersion() != workflowDefinitionVersion) {
workflowDefinition = processDefinitionLogDao.queryProcessDefinitionLog(workflowDefinitionCode, workflowDefinition = processDefinitionLogDao.queryByDefinitionCodeAndVersion(workflowDefinitionCode,
workflowDefinitionVersion); workflowDefinitionVersion);
} }
return Optional.ofNullable(workflowDefinition); return Optional.ofNullable(workflowDefinition);
@ -1856,15 +1856,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<DependentSimplifyDefinition> processDefinitions = processDefinitionMapper List<DependentSimplifyDefinition> processDefinitions = processDefinitionMapper
.queryDefinitionListByProjectCodeAndProcessDefinitionCodes(projectCode, definitionCodesSet); .queryDefinitionListByProjectCodeAndProcessDefinitionCodes(projectCode, definitionCodesSet);
// query process task relation
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryProcessTaskRelationsByProcessDefinitionCode(
processDefinitions.get(0).getCode(),
processDefinitions.get(0).getVersion());
// query task definition log // query task definition log
List<TaskDefinitionLog> taskDefinitionLogsList = List<TaskDefinitionLog> taskDefinitionLogsList = taskDefinitionLogDao.queryByWorkflowDefinitionCodeAndVersion(
taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations); processDefinitions.get(0).getCode(), processDefinitions.get(0).getVersion());
List<DependentSimplifyDefinition> taskDefinitionList = new ArrayList<>(); List<DependentSimplifyDefinition> taskDefinitionList = new ArrayList<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogsList) { for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogsList) {
@ -1914,8 +1908,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineCode(code, limit); List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineCode(code, limit);
processInstanceList.forEach(processInstance -> processInstance processInstanceList.forEach(processInstance -> processInstance
.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()))); .setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())));
List<TaskDefinitionLog> taskDefinitionList = taskDefinitionLogDao.getTaskDefineLogList(processTaskRelationMapper List<TaskDefinitionLog> taskDefinitionList = taskDefinitionLogDao.queryByWorkflowDefinitionCodeAndVersion(
.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode())); processDefinition.getCode(), processDefinition.getVersion());
Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream() Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
@ -2153,7 +2147,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (isCopy) { if (isCopy) {
log.info("Copy process definition..."); log.info("Copy process definition...");
List<TaskDefinitionLog> taskDefinitionLogs = List<TaskDefinitionLog> taskDefinitionLogs =
taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations); taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations);
Map<Long, Long> taskCodeMap = new HashMap<>(); Map<Long, Long> taskCodeMap = new HashMap<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
try { try {

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -283,7 +283,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Override @Override
public ProcessInstance queryByWorkflowInstanceIdThrowExceptionIfNotFound(Integer workflowInstanceId) { public ProcessInstance queryByWorkflowInstanceIdThrowExceptionIfNotFound(Integer workflowInstanceId) {
ProcessInstance processInstance = processInstanceDao.queryByWorkflowInstanceId(workflowInstanceId); ProcessInstance processInstance = processInstanceDao.queryById(workflowInstanceId);
if (processInstance == null) { if (processInstance == null) {
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, workflowInstanceId); throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, workflowInstanceId);
} }
@ -485,7 +485,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result; return result;
} }
List<TaskInstance> taskInstanceList = List<TaskInstance> taskInstanceList =
taskInstanceDao.findValidTaskListByProcessId(processId, processInstance.getTestFlag()); taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processId, processInstance.getTestFlag());
addDependResultForTaskList(loginUser, taskInstanceList); addDependResultForTaskList(loginUser, taskInstanceList);
Map<String, Object> resultMap = new HashMap<>(); Map<String, Object> resultMap = new HashMap<>();
resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString()); resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
@ -498,7 +498,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Override @Override
public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUser, Integer taskId) { public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUser, Integer taskId) {
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskId); TaskInstance taskInstance = taskInstanceDao.queryById(taskId);
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
if (taskInstance == null) { if (taskInstance == null) {
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId); putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
@ -516,11 +516,11 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId); throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId);
} }
List<RelationSubWorkflow> relationSubWorkflows = relationSubWorkflowMapper List<RelationSubWorkflow> relationSubWorkflows = relationSubWorkflowMapper
.queryAllSubProcessInstance(Long.valueOf(taskInstance.getProcessInstanceId()), .queryAllSubProcessInstance((long) taskInstance.getProcessInstanceId(),
taskInstance.getTaskCode()); taskInstance.getTaskCode());
List<Long> allSubProcessInstanceId = relationSubWorkflows.stream() List<Long> allSubProcessInstanceId = relationSubWorkflows.stream()
.map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(java.util.stream.Collectors.toList()); .map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(java.util.stream.Collectors.toList());
List<ProcessInstance> allSubWorkflows = processInstanceDao.queryBatchIds(allSubProcessInstanceId); List<ProcessInstance> allSubWorkflows = processInstanceDao.queryByIds(allSubProcessInstanceId);
if (allSubWorkflows == null || allSubWorkflows.isEmpty()) { if (allSubWorkflows == null || allSubWorkflows.isEmpty()) {
putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId); putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId);
@ -626,7 +626,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result; return result;
} }
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskId); TaskInstance taskInstance = taskInstanceDao.queryById(taskId);
if (taskInstance == null) { if (taskInstance == null) {
log.error("Task instance does not exist, projectCode:{}, taskInstanceId{}.", projectCode, taskId); log.error("Task instance does not exist, projectCode:{}, taskInstanceId{}.", projectCode, taskId);
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId); putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
@ -786,8 +786,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} }
processInstance.setProcessDefinitionVersion(insertVersion); processInstance.setProcessDefinitionVersion(insertVersion);
int update = processInstanceDao.updateProcessInstance(processInstance); boolean update = processInstanceDao.updateById(processInstance);
if (update == 0) { if (!update) {
log.error( log.error(
"Update process instance version error, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}", "Update process instance version error, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}",
projectCode, processDefinition.getCode(), insertVersion); projectCode, processDefinition.getCode(), insertVersion);

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionLogServiceImpl.java

@ -44,7 +44,7 @@ public class TaskDefinitionLogServiceImpl implements TaskDefinitionLogService {
@Override @Override
public void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode) { public void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode) {
List<ProcessTaskRelationLog> processTaskRelations = List<ProcessTaskRelationLog> processTaskRelations =
processTaskRelationLogDao.findByWorkflowDefinitionCode(workflowDefinitionCode); processTaskRelationLogDao.queryByWorkflowDefinitionCode(workflowDefinitionCode);
if (CollectionUtils.isEmpty(processTaskRelations)) { if (CollectionUtils.isEmpty(processTaskRelations)) {
return; return;
} }

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -371,7 +371,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
@Override @Override
public void deleteByWorkflowInstanceId(Integer workflowInstanceId) { public void deleteByWorkflowInstanceId(Integer workflowInstanceId) {
List<TaskInstance> needToDeleteTaskInstances = List<TaskInstance> needToDeleteTaskInstances =
taskInstanceDao.findTaskInstanceByWorkflowInstanceId(workflowInstanceId); taskInstanceDao.queryByWorkflowInstanceId(workflowInstanceId);
if (org.apache.commons.collections4.CollectionUtils.isEmpty(needToDeleteTaskInstances)) { if (org.apache.commons.collections4.CollectionUtils.isEmpty(needToDeleteTaskInstances)) {
return; return;
} }

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java

@ -530,7 +530,7 @@ public class ExecuteFunctionServiceTest {
when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN)) when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN))
.thenReturn(checkProjectAndAuth()); .thenReturn(checkProjectAndAuth());
when(processInstanceDao.queryByWorkflowInstanceId(processInstanceId)).thenReturn(processInstance); when(processInstanceDao.queryOptionalById(processInstanceId)).thenReturn(Optional.of(processInstance));
when(processDefinitionService.queryWorkflowDefinitionThrowExceptionIfNotFound(processDefinitionCode, when(processDefinitionService.queryWorkflowDefinitionThrowExceptionIfNotFound(processDefinitionCode,
processDefinitionVersion)).thenReturn(processDefinition); processDefinitionVersion)).thenReturn(processDefinition);
Map<String, Object> result = Map<String, Object> result =

12
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java

@ -85,7 +85,7 @@ public class LoggerServiceTest {
loginUser.setId(1); loginUser.setId(1);
TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance = new TaskInstance();
taskInstance.setExecutorId(loginUser.getId() + 1); taskInstance.setExecutorId(loginUser.getId() + 1);
Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(taskInstanceDao.queryById(1)).thenReturn(taskInstance);
Result result = loggerService.queryLog(loginUser, 2, 1, 1); Result result = loggerService.queryLog(loginUser, 2, 1, 1);
// TASK_INSTANCE_NOT_FOUND // TASK_INSTANCE_NOT_FOUND
Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue()); Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue());
@ -123,7 +123,7 @@ public class LoggerServiceTest {
// SUCCESS // SUCCESS
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, project, VIEW_LOG); Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, project, VIEW_LOG);
Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(taskInstanceDao.queryById(1)).thenReturn(taskInstance);
result = loggerService.queryLog(loginUser, 1, 1, 1); result = loggerService.queryLog(loginUser, 1, 1, 1);
Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
} }
@ -135,7 +135,7 @@ public class LoggerServiceTest {
loginUser.setId(1); loginUser.setId(1);
TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance = new TaskInstance();
taskInstance.setExecutorId(loginUser.getId() + 1); taskInstance.setExecutorId(loginUser.getId() + 1);
Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(taskInstanceDao.queryById(1)).thenReturn(taskInstance);
// task instance is null // task instance is null
try { try {
@ -198,7 +198,7 @@ public class LoggerServiceTest {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance = new TaskInstance();
Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(taskInstanceDao.queryById(1)).thenReturn(taskInstance);
TaskDefinition taskDefinition = new TaskDefinition(); TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode); taskDefinition.setProjectCode(projectCode);
taskDefinition.setCode(1L); taskDefinition.setCode(1L);
@ -208,7 +208,7 @@ public class LoggerServiceTest {
taskInstance.setHost("127.0.0.1:8080"); taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log"); taskInstance.setLogPath("/temp/log");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, VIEW_LOG)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, VIEW_LOG)).thenReturn(result);
Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(taskInstanceDao.queryById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1); result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1);
Assertions.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); Assertions.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
@ -236,7 +236,7 @@ public class LoggerServiceTest {
taskInstance.setLogPath("/temp/log"); taskInstance.setLogPath("/temp/log");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, DOWNLOAD_LOG)) Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, DOWNLOAD_LOG))
.thenReturn(result); .thenReturn(result);
Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(taskInstanceDao.queryById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString())) Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString()))
.thenReturn(new byte[0]); .thenReturn(new byte[0]);

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -27,7 +27,6 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING; import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -801,9 +800,6 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
Mockito.when(projectMapper.queryByCode(1)).thenReturn(project1); Mockito.when(projectMapper.queryByCode(1)).thenReturn(project1);
Mockito.when(projectService.checkProjectAndAuth(user, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(user, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode))
.thenReturn(getProcessTaskRelation());
Mockito.when(taskDefinitionLogDao.getTaskDefineLogList(any())).thenReturn(new ArrayList<>());
Map<String, Object> taskNotNuLLRes = Map<String, Object> taskNotNuLLRes =
processDefinitionService.viewTree(user, processDefinition.getProjectCode(), 46, 10); processDefinitionService.viewTree(user, processDefinition.getProjectCode(), 46, 10);
Assertions.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); Assertions.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));

11
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -454,7 +454,8 @@ public class ProcessInstanceServiceTest {
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(processInstance.getId())) when(processService.findProcessInstanceDetailById(processInstance.getId()))
.thenReturn(Optional.of(processInstance)); .thenReturn(Optional.of(processInstance));
when(taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag())) when(taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(),
processInstance.getTestFlag()))
.thenReturn(taskInstanceList); .thenReturn(taskInstanceList);
when(loggerService.queryLog(loginUser, taskInstance.getId(), 0, 4098)).thenReturn(res); when(loggerService.queryLog(loginUser, taskInstance.getId(), 0, 4098)).thenReturn(res);
Map<String, Object> successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1); Map<String, Object> successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1);
@ -497,7 +498,7 @@ public class ProcessInstanceServiceTest {
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(null); when(taskInstanceDao.queryById(1)).thenReturn(null);
Map<String, Object> taskNullRes = Map<String, Object> taskNullRes =
processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1); processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1);
Assertions.assertEquals(Status.TASK_INSTANCE_NOT_EXISTS, taskNullRes.get(Constants.STATUS)); Assertions.assertEquals(Status.TASK_INSTANCE_NOT_EXISTS, taskNullRes.get(Constants.STATUS));
@ -507,7 +508,7 @@ public class ProcessInstanceServiceTest {
taskInstance.setTaskType("HTTP"); taskInstance.setTaskType("HTTP");
taskInstance.setProcessInstanceId(1); taskInstance.setProcessInstanceId(1);
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); when(taskInstanceDao.queryById(1)).thenReturn(taskInstance);
TaskDefinition taskDefinition = new TaskDefinition(); TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode); taskDefinition.setProjectCode(projectCode);
when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
@ -527,7 +528,7 @@ public class ProcessInstanceServiceTest {
subTask.setTaskType("SUB_PROCESS"); subTask.setTaskType("SUB_PROCESS");
subTask.setProcessInstanceId(1); subTask.setProcessInstanceId(1);
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
when(taskInstanceDao.findTaskInstanceById(subTask.getId())).thenReturn(subTask); when(taskInstanceDao.queryById(subTask.getId())).thenReturn(subTask);
when(processService.findSubProcessInstance(subTask.getProcessInstanceId(), subTask.getId())).thenReturn(null); when(processService.findSubProcessInstance(subTask.getProcessInstanceId(), subTask.getId())).thenReturn(null);
Map<String, Object> subprocessNotExistRes = Map<String, Object> subprocessNotExistRes =
processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1); processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1);
@ -593,7 +594,7 @@ public class ProcessInstanceServiceTest {
when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant); when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant);
when(processService.getTenantForProcess(Mockito.anyString(), Mockito.anyInt())) when(processService.getTenantForProcess(Mockito.anyString(), Mockito.anyInt()))
.thenReturn(tenant.getTenantCode()); .thenReturn(tenant.getTenantCode());
when(processInstanceDao.updateProcessInstance(processInstance)).thenReturn(1); when(processInstanceDao.updateById(processInstance)).thenReturn(true);
when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.FALSE)).thenReturn(1); when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.FALSE)).thenReturn(1);
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java

@ -395,8 +395,8 @@ public class TaskInstanceServiceTest {
when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(taskInstanceMapper.selectById(1)).thenReturn(task); when(taskInstanceMapper.selectById(1)).thenReturn(task);
when(taskInstanceDao.findTaskInstanceByCacheKey(cacheKey)).thenReturn(task, null); when(taskInstanceDao.queryByCacheKey(cacheKey)).thenReturn(task, null);
when(taskInstanceDao.updateTaskInstance(task)).thenReturn(true); when(taskInstanceDao.updateById(task)).thenReturn(true);
TaskInstanceRemoveCacheResponse response = TaskInstanceRemoveCacheResponse response =
taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId); taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId);

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java

@ -74,6 +74,7 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
Integer queryMaxVersionForDefinition(@Param("code") long code); Integer queryMaxVersionForDefinition(@Param("code") long code);
/** /**
* todo: rename to query by code and version
* @param taskDefinitions taskDefinition list * @param taskDefinitions taskDefinition list
* @return list * @return list
*/ */

91
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.commons.collections4.CollectionUtils;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.NonNull;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public abstract class BaseDao<ENTITY, MYBATIS_MAPPER extends BaseMapper<ENTITY>> implements IDao<ENTITY> {
protected MYBATIS_MAPPER mybatisMapper;
public BaseDao(@NonNull MYBATIS_MAPPER mybatisMapper) {
this.mybatisMapper = mybatisMapper;
}
@Override
public ENTITY queryById(@NonNull Serializable id) {
return mybatisMapper.selectById(id);
}
@Override
public Optional<ENTITY> queryOptionalById(@NonNull Serializable id) {
return Optional.ofNullable(queryById(id));
}
@Override
public List<ENTITY> queryByIds(Collection<? extends Serializable> ids) {
if (CollectionUtils.isEmpty(ids)) {
return Collections.emptyList();
}
return mybatisMapper.selectBatchIds(ids);
}
@Override
public int insert(@NonNull ENTITY model) {
return mybatisMapper.insert(model);
}
@Override
public void insertBatch(Collection<ENTITY> models) {
if (CollectionUtils.isEmpty(models)) {
return;
}
for (ENTITY model : models) {
insert(model);
}
}
@Override
public boolean updateById(@NonNull ENTITY model) {
return mybatisMapper.updateById(model) > 0;
}
@Override
public boolean deleteById(@NonNull Serializable id) {
return mybatisMapper.deleteById(id) > 0;
}
@Override
public boolean deleteByIds(Collection<? extends Serializable> ids) {
if (CollectionUtils.isEmpty(ids)) {
return true;
}
return mybatisMapper.deleteBatchIds(ids) > 0;
}
}

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/DqExecuteResultDao.java

@ -17,7 +17,9 @@
package org.apache.dolphinscheduler.dao.repository; package org.apache.dolphinscheduler.dao.repository;
public interface DqExecuteResultDao { import org.apache.dolphinscheduler.dao.entity.DqExecuteResult;
public interface DqExecuteResultDao extends IDao<DqExecuteResult> {
void deleteByWorkflowInstanceId(Integer workflowInstanceId); void deleteByWorkflowInstanceId(Integer workflowInstanceId);
} }

69
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import lombok.NonNull;
public interface IDao<Entity> {
/**
* Query the entity by primary key.
*/
Entity queryById(@NonNull Serializable id);
/**
* Same with {@link #queryById(Serializable)} but return {@link Optional} instead of null.
*/
Optional<Entity> queryOptionalById(@NonNull Serializable id);
/**
* Query the entity by primary keys.
*/
List<Entity> queryByIds(Collection<? extends Serializable> ids);
/**
* Insert the entity.
*/
int insert(@NonNull Entity model);
/**
* Insert the entities.
*/
void insertBatch(Collection<Entity> models);
/**
* Update the entity by primary key.
*/
boolean updateById(@NonNull Entity model);
/**
* Delete the entity by primary key.
*/
boolean deleteById(@NonNull Serializable id);
/**
* Delete the entities by primary keys.
*/
boolean deleteByIds(Collection<? extends Serializable> ids);
}

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.dao.repository; package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.model.PageListingResult; import org.apache.dolphinscheduler.dao.model.PageListingResult;
import java.util.Collection; import java.util.Collection;
@ -27,7 +26,7 @@ import java.util.Optional;
import javax.annotation.Nullable; import javax.annotation.Nullable;
public interface ProcessDefinitionDao { public interface ProcessDefinitionDao extends IDao<ProcessDefinition> {
/** /**
* Listing the process definition belongs to the given userId and projectCode. * Listing the process definition belongs to the given userId and projectCode.
@ -42,17 +41,8 @@ public interface ProcessDefinitionDao {
int userId, int userId,
long projectCode); long projectCode);
/**
* query process definitions by definition codes and versions
* @param processInstances process instances where codes and version come from
* @return
*/
List<ProcessDefinition> queryProcessDefinitionsByCodesAndVersions(List<ProcessInstance> processInstances);
Optional<ProcessDefinition> queryByCode(long code); Optional<ProcessDefinition> queryByCode(long code);
void deleteById(Integer workflowDefinitionId);
void deleteByWorkflowDefinitionCode(long workflowDefinitionCode); void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
List<ProcessDefinition> queryByCodes(Collection<Long> processDefinitionCodes); List<ProcessDefinition> queryByCodes(Collection<Long> processDefinitionCodes);

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java

@ -19,9 +19,9 @@ package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
public interface ProcessDefinitionLogDao { public interface ProcessDefinitionLogDao extends IDao<ProcessDefinitionLog> {
ProcessDefinitionLog queryProcessDefinitionLog(long workflowDefinitionCode, int workflowDefinitionVersion); ProcessDefinitionLog queryByDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion);
void deleteByWorkflowDefinitionCode(long workflowDefinitionCode); void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
} }

35
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java

@ -22,29 +22,14 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import java.util.Date; public interface ProcessInstanceDao extends IDao<ProcessInstance> {
import java.util.List;
public interface ProcessInstanceDao {
public int insertProcessInstance(ProcessInstance processInstance);
public int updateProcessInstance(ProcessInstance processInstance);
/** /**
* insert or update work process instance to database * insert or update work process instance to database
* *
* @param processInstance processInstance * @param processInstance processInstance
*/ */
public int upsertProcessInstance(ProcessInstance processInstance); void upsertProcessInstance(ProcessInstance processInstance);
List<ProcessInstance> queryBatchIds(List<Long> processInstanceIds);
void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds);
void deleteById(Integer workflowInstanceId);
ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId);
/** /**
* find last scheduler process instance in the date interval * find last scheduler process instance in the date interval
@ -53,7 +38,7 @@ public interface ProcessInstanceDao {
* @param dateInterval dateInterval * @param dateInterval dateInterval
* @return process instance * @return process instance
*/ */
ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag); ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
/** /**
* find last manual process instance interval * find last manual process instance interval
@ -62,17 +47,7 @@ public interface ProcessInstanceDao {
* @param dateInterval dateInterval * @param dateInterval dateInterval
* @return process instance * @return process instance
*/ */
ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag); ProcessInstance queryLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
/**
* find last running process instance
*
* @param definitionCode process definition code
* @param startTime start time
* @param endTime end time
* @return process instance
*/
ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag);
/** /**
* query first schedule process instance * query first schedule process instance
@ -90,5 +65,5 @@ public interface ProcessInstanceDao {
*/ */
ProcessInstance queryFirstStartProcessInstance(@Param("processDefinitionCode") Long definitionCode); ProcessInstance queryFirstStartProcessInstance(@Param("processDefinitionCode") Long definitionCode);
ProcessInstance findSubProcessInstanceByParentId(Integer processInstanceId, Integer taskInstanceId); ProcessInstance querySubProcessInstanceByParentId(Integer processInstanceId, Integer taskInstanceId);
} }

18
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java

@ -24,21 +24,7 @@ import java.util.List;
/** /**
* Process Instance Map DAO * Process Instance Map DAO
*/ */
public interface ProcessInstanceMapDao { public interface ProcessInstanceMapDao extends IDao<ProcessInstanceMap> {
/**
* Update process instance map
* @param processInstanceMap process instance map
* @return result
*/
int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
/**
* Create process instance map to DB.
* @param processInstanceMap process instance map
* @return result
*/
int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
/** /**
* find work process map by parent process id and parent task id. * find work process map by parent process id and parent task id.
@ -46,7 +32,7 @@ public interface ProcessInstanceMapDao {
* @param parentTaskId parentTaskId * @param parentTaskId parentTaskId
* @return process instance map * @return process instance map
*/ */
ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId); ProcessInstanceMap queryWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId);
List<Integer> querySubWorkflowInstanceIds(int workflowInstanceId); List<Integer> querySubWorkflowInstanceIds(int workflowInstanceId);

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java

@ -23,9 +23,9 @@ import org.apache.ibatis.annotations.Param;
import java.util.List; import java.util.List;
public interface ProcessTaskRelationLogDao { public interface ProcessTaskRelationLogDao extends IDao<ProcessTaskRelationLog> {
List<ProcessTaskRelationLog> findByWorkflowDefinitionCode(long workflowDefinitionCode); List<ProcessTaskRelationLog> queryByWorkflowDefinitionCode(long workflowDefinitionCode);
void deleteByWorkflowDefinitionCode(long workflowDefinitionCode); void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProjectDao.java

@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.dao.entity.Project;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
public interface ProjectDao { public interface ProjectDao extends IDao<Project> {
List<Project> queryByCodes(Collection<Long> projectCodes); List<Project> queryByCodes(Collection<Long> projectCodes);

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java

@ -26,10 +26,11 @@ import java.util.Set;
/** /**
* Task Instance DAO * Task Instance DAO
*/ */
public interface TaskDefinitionDao { public interface TaskDefinitionDao extends IDao<TaskDefinition> {
/** /**
* Get list of task definition by process definition code * Get list of task definition by process definition code
*
* @param processDefinitionCode process definition code * @param processDefinitionCode process definition code
* @return list of task definition * @return list of task definition
*/ */

17
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java

@ -26,21 +26,12 @@ import java.util.Set;
/** /**
* Task Definition Log DAO * Task Definition Log DAO
*/ */
public interface TaskDefinitionLogDao { public interface TaskDefinitionLogDao extends IDao<TaskDefinitionLog> {
/** List<TaskDefinitionLog> queryByWorkflowDefinitionCodeAndVersion(Long workflowDefinitionCode,
* Get task definition log list Integer workflowDefinitionVersion);
* @param processTaskRelations list of process task relation
* @return list of task definition
*/
List<TaskDefinitionLog> getTaskDefineLogList(List<ProcessTaskRelation> processTaskRelations);
/** List<TaskDefinitionLog> queryTaskDefineLogList(List<ProcessTaskRelation> processTaskRelations);
* Query task definition log list by process task relation list
* @param processTaskRelations list of task relation
* @return list of task definition log
*/
List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations);
void deleteByTaskDefinitionCodes(Set<Long> taskDefinitionCodes); void deleteByTaskDefinitionCodes(Set<Long> taskDefinitionCodes);
} }

41
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java

@ -25,31 +25,18 @@ import java.util.List;
/** /**
* Task Instance DAO * Task Instance DAO
*/ */
public interface TaskInstanceDao { public interface TaskInstanceDao extends IDao<TaskInstance> {
/** /**
* Update or Insert task instance to DB. * Update or Insert task instance to DB.
* ID is null -> Insert * ID is null -> Insert
* ID is not null -> Update * ID is not null -> Update
*
* @param taskInstance task instance * @param taskInstance task instance
* @return result * @return result
*/ */
boolean upsertTaskInstance(TaskInstance taskInstance); boolean upsertTaskInstance(TaskInstance taskInstance);
/**
* Insert task instance to DB.
* @param taskInstance task instance
* @return result
*/
boolean insertTaskInstance(TaskInstance taskInstance);
/**
* Update task instance to DB.
* @param taskInstance task instance
* @return result
*/
boolean updateTaskInstance(TaskInstance taskInstance);
/** /**
* Submit a task instance to DB. * Submit a task instance to DB.
* @param taskInstance task instance * @param taskInstance task instance
@ -64,7 +51,7 @@ public interface TaskInstanceDao {
* @param testFlag test flag * @param testFlag test flag
* @return list of valid task instance * @return list of valid task instance
*/ */
List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag); List<TaskInstance> queryValidTaskListByWorkflowInstanceId(Integer processInstanceId, int testFlag);
/** /**
* Query list of task instance by process instance id and task code * Query list of task instance by process instance id and task code
@ -72,28 +59,21 @@ public interface TaskInstanceDao {
* @param taskCode task code * @param taskCode task code
* @return list of valid task instance * @return list of valid task instance
*/ */
TaskInstance findTaskByInstanceIdAndCode(Integer processInstanceId, Long taskCode); TaskInstance queryByWorkflowInstanceIdAndTaskCode(Integer processInstanceId, Long taskCode);
/** /**
* find previous task list by work process id * find previous task list by work process id
* @param processInstanceId processInstanceId * @param processInstanceId processInstanceId
* @return task instance list * @return task instance list
*/ */
List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId); List<TaskInstance> queryPreviousTaskListByWorkflowInstanceId(Integer processInstanceId);
/**
* find task instance by id
* @param taskId task id
* @return task instance
*/
TaskInstance findTaskInstanceById(Integer taskId);
/** /**
* find task instance by cache_key * find task instance by cache_key
* @param cacheKey cache key * @param cacheKey cache key
* @return task instance * @return task instance
*/ */
TaskInstance findTaskInstanceByCacheKey(String cacheKey); TaskInstance queryByCacheKey(String cacheKey);
/** /**
* clear task instance cache by cache_key * clear task instance cache by cache_key
@ -102,14 +82,7 @@ public interface TaskInstanceDao {
*/ */
Boolean clearCacheByCacheKey(String cacheKey); Boolean clearCacheByCacheKey(String cacheKey);
/**
* find task instance list by id list
* @param idList task id list
* @return task instance list
*/
List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList);
void deleteByWorkflowInstanceId(int workflowInstanceId); void deleteByWorkflowInstanceId(int workflowInstanceId);
List<TaskInstance> findTaskInstanceByWorkflowInstanceId(Integer processInstanceId); List<TaskInstance> queryByWorkflowInstanceId(Integer processInstanceId);
} }

16
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/DqExecuteResultDaoImpl.java

@ -17,20 +17,26 @@
package org.apache.dolphinscheduler.dao.repository.impl; package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResult;
import org.apache.dolphinscheduler.dao.mapper.DqExecuteResultMapper; import org.apache.dolphinscheduler.dao.mapper.DqExecuteResultMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.DqExecuteResultDao; import org.apache.dolphinscheduler.dao.repository.DqExecuteResultDao;
import org.springframework.beans.factory.annotation.Autowired; import lombok.NonNull;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
@Repository @Repository
public class DqExecuteResultDaoImpl implements DqExecuteResultDao { public class DqExecuteResultDaoImpl extends BaseDao<DqExecuteResult, DqExecuteResultMapper>
implements
DqExecuteResultDao {
@Autowired public DqExecuteResultDaoImpl(@NonNull DqExecuteResultMapper dqExecuteResultMapper) {
private DqExecuteResultMapper dqExecuteResultMapper; super(dqExecuteResultMapper);
}
@Override @Override
public void deleteByWorkflowInstanceId(Integer workflowInstanceId) { public void deleteByWorkflowInstanceId(Integer workflowInstanceId) {
dqExecuteResultMapper.deleteByWorkflowInstanceId(workflowInstanceId); mybatisMapper.deleteByWorkflowInstanceId(workflowInstanceId);
} }
} }

56
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java

@ -18,43 +18,40 @@
package org.apache.dolphinscheduler.dao.repository.impl; package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult; import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired; import lombok.NonNull;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@Repository @Repository
public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao { public class ProcessDefinitionDaoImpl extends BaseDao<ProcessDefinition, ProcessDefinitionMapper>
implements
ProcessDefinitionDao {
@Autowired public ProcessDefinitionDaoImpl(@NonNull ProcessDefinitionMapper processDefinitionMapper) {
private ProcessDefinitionMapper processDefinitionMapper; super(processDefinitionMapper);
@Autowired }
private ProcessDefinitionLogMapper processDefinitionLogMapper;
@Override @Override
public PageListingResult<ProcessDefinition> listingProcessDefinition(int pageNumber, int pageSize, String searchVal, public PageListingResult<ProcessDefinition> listingProcessDefinition(int pageNumber, int pageSize, String searchVal,
int userId, long projectCode) { int userId, long projectCode) {
Page<ProcessDefinition> page = new Page<>(pageNumber, pageSize); Page<ProcessDefinition> page = new Page<>(pageNumber, pageSize);
IPage<ProcessDefinition> processDefinitions = IPage<ProcessDefinition> processDefinitions =
processDefinitionMapper.queryDefineListPaging(page, searchVal, userId, projectCode); mybatisMapper.queryDefineListPaging(page, searchVal, userId, projectCode);
return PageListingResult.<ProcessDefinition>builder() return PageListingResult.<ProcessDefinition>builder()
.totalCount(processDefinitions.getTotal()) .totalCount(processDefinitions.getTotal())
@ -64,41 +61,14 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
.build(); .build();
} }
@Override
public List<ProcessDefinition> queryProcessDefinitionsByCodesAndVersions(List<ProcessInstance> processInstances) {
if (Objects.isNull(processInstances) || processInstances.isEmpty()) {
return new ArrayList<>();
}
List<ProcessDefinitionLog> processDefinitionLogs = processInstances
.parallelStream()
.map(processInstance -> {
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper
.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
return processDefinitionLog;
})
.collect(Collectors.toList());
List<ProcessDefinition> processDefinitions =
processDefinitionLogs.stream().map(log -> (ProcessDefinition) log).collect(Collectors.toList());
return processDefinitions;
}
@Override @Override
public Optional<ProcessDefinition> queryByCode(long code) { public Optional<ProcessDefinition> queryByCode(long code) {
return Optional.ofNullable( return Optional.ofNullable(mybatisMapper.queryByCode(code));
processDefinitionMapper.queryByCode(code));
}
@Override
public void deleteById(Integer workflowDefinitionId) {
processDefinitionMapper.deleteById(workflowDefinitionId);
} }
@Override @Override
public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) { public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
processDefinitionMapper.deleteByCode(workflowDefinitionCode); mybatisMapper.deleteByCode(workflowDefinitionCode);
} }
@Override @Override
@ -106,6 +76,6 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
if (CollectionUtils.isEmpty(processDefinitionCodes)) { if (CollectionUtils.isEmpty(processDefinitionCodes)) {
return Collections.emptyList(); return Collections.emptyList();
} }
return processDefinitionMapper.queryByCodes(processDefinitionCodes); return mybatisMapper.queryByCodes(processDefinitionCodes);
} }
} }

21
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java

@ -19,25 +19,30 @@ package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
import org.springframework.beans.factory.annotation.Autowired; import lombok.NonNull;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
@Repository @Repository
public class ProcessDefinitionLogDaoImpl implements ProcessDefinitionLogDao { public class ProcessDefinitionLogDaoImpl extends BaseDao<ProcessDefinitionLog, ProcessDefinitionLogMapper>
implements
ProcessDefinitionLogDao {
@Autowired public ProcessDefinitionLogDaoImpl(@NonNull ProcessDefinitionLogMapper processDefinitionLogMapper) {
private ProcessDefinitionLogMapper processDefinitionLogMapper; super(processDefinitionLogMapper);
}
@Override @Override
public ProcessDefinitionLog queryProcessDefinitionLog(long workflowDefinitionCode, int workflowDefinitionVersion) { public ProcessDefinitionLog queryByDefinitionCodeAndVersion(long workflowDefinitionCode,
return processDefinitionLogMapper.queryByDefinitionCodeAndVersion(workflowDefinitionCode, int workflowDefinitionVersion) {
workflowDefinitionVersion); return mybatisMapper.queryByDefinitionCodeAndVersion(workflowDefinitionCode, workflowDefinitionVersion);
} }
@Override @Override
public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) { public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
processDefinitionLogMapper.deleteByProcessDefinitionCode(workflowDefinitionCode); mybatisMapper.deleteByProcessDefinitionCode(workflowDefinitionCode);
} }
} }

91
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java

@ -17,20 +17,14 @@
package org.apache.dolphinscheduler.dao.repository.impl; package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import lombok.NonNull; import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -39,57 +33,24 @@ import org.springframework.stereotype.Repository;
@Slf4j @Slf4j
@Repository @Repository
public class ProcessInstanceDaoImpl implements ProcessInstanceDao { public class ProcessInstanceDaoImpl extends BaseDao<ProcessInstance, ProcessInstanceMapper>
implements
@Autowired ProcessInstanceDao {
private ProcessInstanceMapper processInstanceMapper;
@Autowired @Autowired
private ProcessInstanceMapMapper processInstanceMapMapper; private ProcessInstanceMapMapper processInstanceMapMapper;
@Override public ProcessInstanceDaoImpl(@NonNull ProcessInstanceMapper processInstanceMapper) {
public int insertProcessInstance(ProcessInstance processInstance) { super(processInstanceMapper);
return processInstanceMapper.insert(processInstance);
}
@Override
public int updateProcessInstance(ProcessInstance processInstance) {
return processInstanceMapper.updateById(processInstance);
} }
@Override @Override
public int upsertProcessInstance(@NonNull ProcessInstance processInstance) { public void upsertProcessInstance(@NonNull ProcessInstance processInstance) {
if (processInstance.getId() != null) { if (processInstance.getId() != null) {
return updateProcessInstance(processInstance); updateById(processInstance);
} else { } else {
return insertProcessInstance(processInstance); insert(processInstance);
}
}
@Override
public List<ProcessInstance> queryBatchIds(List<Long> processInstanceIds) {
if (CollectionUtils.isEmpty(processInstanceIds)) {
return new ArrayList<>();
}
return processInstanceMapper.selectBatchIds(processInstanceIds);
}
@Override
public void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds) {
if (CollectionUtils.isEmpty(needToDeleteWorkflowInstanceIds)) {
return;
} }
processInstanceMapper.deleteBatchIds(needToDeleteWorkflowInstanceIds);
}
@Override
public void deleteById(Integer workflowInstanceId) {
processInstanceMapper.deleteById(workflowInstanceId);
}
@Override
public ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId) {
return processInstanceMapper.selectById(workflowInstanceId);
} }
/** /**
@ -100,9 +61,9 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
* @return process instance * @return process instance
*/ */
@Override @Override
public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, public ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval,
int testFlag) { int testFlag) {
return processInstanceMapper.queryLastSchedulerProcess(definitionCode, return mybatisMapper.queryLastSchedulerProcess(definitionCode,
dateInterval.getStartTime(), dateInterval.getStartTime(),
dateInterval.getEndTime(), dateInterval.getEndTime(),
testFlag); testFlag);
@ -116,30 +77,14 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
* @return process instance * @return process instance
*/ */
@Override @Override
public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) { public ProcessInstance queryLastManualProcessInterval(Long definitionCode, DateInterval dateInterval,
return processInstanceMapper.queryLastManualProcess(definitionCode, int testFlag) {
return mybatisMapper.queryLastManualProcess(definitionCode,
dateInterval.getStartTime(), dateInterval.getStartTime(),
dateInterval.getEndTime(), dateInterval.getEndTime(),
testFlag); testFlag);
} }
/**
* find last running process instance
*
* @param definitionCode process definition code
* @param startTime start time
* @param endTime end time
* @return process instance
*/
@Override
public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag) {
return processInstanceMapper.queryLastRunningProcess(definitionCode,
startTime,
endTime,
testFlag,
WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
/** /**
* query first schedule process instance * query first schedule process instance
* *
@ -148,7 +93,7 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
*/ */
@Override @Override
public ProcessInstance queryFirstScheduleProcessInstance(Long definitionCode) { public ProcessInstance queryFirstScheduleProcessInstance(Long definitionCode) {
return processInstanceMapper.queryFirstScheduleProcessInstance(definitionCode); return mybatisMapper.queryFirstScheduleProcessInstance(definitionCode);
} }
/** /**
@ -159,18 +104,18 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
*/ */
@Override @Override
public ProcessInstance queryFirstStartProcessInstance(Long definitionCode) { public ProcessInstance queryFirstStartProcessInstance(Long definitionCode) {
return processInstanceMapper.queryFirstStartProcessInstance(definitionCode); return mybatisMapper.queryFirstStartProcessInstance(definitionCode);
} }
@Override @Override
public ProcessInstance findSubProcessInstanceByParentId(Integer processInstanceId, Integer taskInstanceId) { public ProcessInstance querySubProcessInstanceByParentId(Integer processInstanceId, Integer taskInstanceId) {
ProcessInstance processInstance = null; ProcessInstance processInstance = null;
ProcessInstanceMap processInstanceMap = ProcessInstanceMap processInstanceMap =
processInstanceMapMapper.queryByParentId(processInstanceId, taskInstanceId); processInstanceMapMapper.queryByParentId(processInstanceId, taskInstanceId);
if (processInstanceMap == null || processInstanceMap.getProcessInstanceId() == 0) { if (processInstanceMap == null || processInstanceMap.getProcessInstanceId() == 0) {
return processInstance; return processInstance;
} }
processInstance = queryByWorkflowInstanceId(processInstanceMap.getProcessInstanceId()); processInstance = queryById(processInstanceMap.getProcessInstanceId());
return processInstance; return processInstance;
} }
} }

27
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java

@ -19,46 +19,39 @@ package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
import java.util.List; import java.util.List;
import lombok.NonNull; import lombok.NonNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
/** /**
* Process Instance Map Dao implementation * Process Instance Map Dao implementation
*/ */
@Repository @Repository
public class ProcessInstanceMapDaoImpl implements ProcessInstanceMapDao { public class ProcessInstanceMapDaoImpl extends BaseDao<ProcessInstanceMap, ProcessInstanceMapMapper>
implements
ProcessInstanceMapDao {
@Autowired public ProcessInstanceMapDaoImpl(@NonNull ProcessInstanceMapMapper processInstanceMapMapper) {
private ProcessInstanceMapMapper processInstanceMapMapper; super(processInstanceMapMapper);
@Override
public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
return processInstanceMapMapper.updateById(processInstanceMap);
}
@Override
public int createWorkProcessInstanceMap(@NonNull ProcessInstanceMap processInstanceMap) {
return processInstanceMapMapper.insert(processInstanceMap);
} }
@Override @Override
public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) { public ProcessInstanceMap queryWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId); return mybatisMapper.queryByParentId(parentWorkProcessId, parentTaskId);
} }
@Override @Override
public List<Integer> querySubWorkflowInstanceIds(int workflowInstanceId) { public List<Integer> querySubWorkflowInstanceIds(int workflowInstanceId) {
return processInstanceMapMapper.querySubIdListByParentId(workflowInstanceId); return mybatisMapper.querySubIdListByParentId(workflowInstanceId);
} }
@Override @Override
public void deleteByParentId(int workflowInstanceId) { public void deleteByParentId(int workflowInstanceId) {
processInstanceMapMapper.deleteByParentId(workflowInstanceId); mybatisMapper.deleteByParentId(workflowInstanceId);
} }
} }

26
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java

@ -19,31 +19,41 @@ package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao; import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
import org.apache.commons.collections4.CollectionUtils;
import java.util.List; import java.util.List;
import org.springframework.beans.factory.annotation.Autowired; import lombok.NonNull;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
@Repository @Repository
public class ProcessTaskRelationLogDaoImpl implements ProcessTaskRelationLogDao { public class ProcessTaskRelationLogDaoImpl extends BaseDao<ProcessTaskRelationLog, ProcessTaskRelationLogMapper>
implements
ProcessTaskRelationLogDao {
@Autowired public ProcessTaskRelationLogDaoImpl(@NonNull ProcessTaskRelationLogMapper processTaskRelationLogMapper) {
private ProcessTaskRelationLogMapper processTaskRelationLogMapper; super(processTaskRelationLogMapper);
}
@Override @Override
public List<ProcessTaskRelationLog> findByWorkflowDefinitionCode(long workflowDefinitionCode) { public List<ProcessTaskRelationLog> queryByWorkflowDefinitionCode(long workflowDefinitionCode) {
return processTaskRelationLogMapper.queryByProcessCode(workflowDefinitionCode); return mybatisMapper.queryByProcessCode(workflowDefinitionCode);
} }
@Override @Override
public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) { public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
processTaskRelationLogMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode); mybatisMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode);
} }
@Override @Override
public int batchInsert(List<ProcessTaskRelationLog> taskRelationList) { public int batchInsert(List<ProcessTaskRelationLog> taskRelationList) {
return processTaskRelationLogMapper.batchInsert(taskRelationList); if (CollectionUtils.isEmpty(taskRelationList)) {
return 0;
}
return mybatisMapper.batchInsert(taskRelationList);
} }
} }

13
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProjectDaoImpl.java

@ -19,22 +19,25 @@ package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.ProjectDao; import org.apache.dolphinscheduler.dao.repository.ProjectDao;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.springframework.beans.factory.annotation.Autowired; import lombok.NonNull;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
@Repository @Repository
public class ProjectDaoImpl implements ProjectDao { public class ProjectDaoImpl extends BaseDao<Project, ProjectMapper> implements ProjectDao {
@Autowired public ProjectDaoImpl(@NonNull ProjectMapper projectMapper) {
private ProjectMapper projectMapper; super(projectMapper);
}
@Override @Override
public List<Project> queryByCodes(Collection<Long> projectCodes) { public List<Project> queryByCodes(Collection<Long> projectCodes) {
return projectMapper.queryByCodes(projectCodes); return mybatisMapper.queryByCodes(projectCodes);
} }
} }

30
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
@ -32,10 +33,11 @@ import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -48,7 +50,7 @@ import com.google.common.collect.Lists;
*/ */
@Repository @Repository
@Slf4j @Slf4j
public class TaskDefinitionDaoImpl implements TaskDefinitionDao { public class TaskDefinitionDaoImpl extends BaseDao<TaskDefinition, TaskDefinitionMapper> implements TaskDefinitionDao {
@Autowired @Autowired
private ProcessDefinitionMapper processDefinitionMapper; private ProcessDefinitionMapper processDefinitionMapper;
@ -59,8 +61,9 @@ public class TaskDefinitionDaoImpl implements TaskDefinitionDao {
@Autowired @Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper; private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired public TaskDefinitionDaoImpl(@NonNull TaskDefinitionMapper taskDefinitionMapper) {
private TaskDefinitionMapper taskDefinitionMapper; super(taskDefinitionMapper);
}
@Override @Override
public List<TaskDefinition> getTaskDefinitionListByDefinition(long processDefinitionCode) { public List<TaskDefinition> getTaskDefinitionListByDefinition(long processDefinitionCode) {
@ -70,11 +73,13 @@ public class TaskDefinitionDaoImpl implements TaskDefinitionDao {
return Lists.newArrayList(); return Lists.newArrayList();
} }
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(
.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); processDefinition.getCode(), processDefinition.getVersion());
Set<TaskDefinition> taskDefinitionSet = new HashSet<>(); Set<TaskDefinition> taskDefinitionSet = processTaskRelations
processTaskRelations.stream().filter(p -> p.getPostTaskCode() > 0) .stream()
.forEach(p -> taskDefinitionSet.add(new TaskDefinition(p.getPostTaskCode(), p.getPostTaskVersion()))); .filter(p -> p.getPostTaskCode() > 0)
.map(p -> new TaskDefinition(p.getPostTaskCode(), p.getPostTaskVersion()))
.collect(Collectors.toSet());
if (taskDefinitionSet.isEmpty()) { if (taskDefinitionSet.isEmpty()) {
return Lists.newArrayList(); return Lists.newArrayList();
@ -90,8 +95,7 @@ public class TaskDefinitionDaoImpl implements TaskDefinitionDao {
@Override @Override
public void deleteByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion) { public void deleteByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion) {
taskDefinitionMapper.deleteByWorkflowDefinitionCodeAndVersion(workflowDefinitionCode, mybatisMapper.deleteByWorkflowDefinitionCodeAndVersion(workflowDefinitionCode, workflowDefinitionVersion);
workflowDefinitionVersion);
} }
@Override @Override
@ -99,7 +103,7 @@ public class TaskDefinitionDaoImpl implements TaskDefinitionDao {
if (CollectionUtils.isEmpty(needToDeleteTaskDefinitionCodes)) { if (CollectionUtils.isEmpty(needToDeleteTaskDefinitionCodes)) {
return; return;
} }
taskDefinitionMapper.deleteByBatchCodes(new ArrayList<>(needToDeleteTaskDefinitionCodes)); mybatisMapper.deleteByBatchCodes(new ArrayList<>(needToDeleteTaskDefinitionCodes));
} }
@Override @Override
@ -107,7 +111,7 @@ public class TaskDefinitionDaoImpl implements TaskDefinitionDao {
if (CollectionUtils.isEmpty(taskDefinitionCodes)) { if (CollectionUtils.isEmpty(taskDefinitionCodes)) {
return Collections.emptyList(); return Collections.emptyList();
} }
return taskDefinitionMapper.queryByCodeList(taskDefinitionCodes); return mybatisMapper.queryByCodeList(taskDefinitionCodes);
} }
} }

75
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java

@ -20,71 +20,64 @@ package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
import com.google.common.collect.Lists;
/** /**
* Task Definition Log DAP implementation * Task Definition Log DAP implementation
*/ */
@Repository @Repository
public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao { public class TaskDefinitionLogDaoImpl extends BaseDao<TaskDefinitionLog, TaskDefinitionLogMapper>
implements
TaskDefinitionLogDao {
@Autowired @Autowired
private TaskDefinitionDao taskDefinitionDao; private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Autowired public TaskDefinitionLogDaoImpl(@NonNull TaskDefinitionLogMapper taskDefinitionLogMapper) {
private TaskDefinitionLogMapper taskDefinitionLogMapper; super(taskDefinitionLogMapper);
}
@Override @Override
public List<TaskDefinitionLog> getTaskDefineLogList(List<ProcessTaskRelation> processTaskRelations) { public List<TaskDefinitionLog> queryByWorkflowDefinitionCodeAndVersion(Long workflowDefinitionCode,
Set<TaskDefinition> taskDefinitionSet = new HashSet<>(); Integer workflowDefinitionVersion) {
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) { List<ProcessTaskRelation> processTaskRelationLogs = processTaskRelationLogMapper
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(), .queryByProcessCodeAndVersion(workflowDefinitionCode, workflowDefinitionVersion)
processTaskRelation.getPreTaskVersion())); .stream()
} .map(p -> (ProcessTaskRelation) p)
if (processTaskRelation.getPostTaskCode() > 0) { .collect(Collectors.toList());
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), return queryTaskDefineLogList(processTaskRelationLogs);
processTaskRelation.getPostTaskVersion()));
}
}
if (taskDefinitionSet.isEmpty()) {
return Lists.newArrayList();
}
return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
} }
@Override @Override
public List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations) { public List<TaskDefinitionLog> queryTaskDefineLogList(List<ProcessTaskRelation> processTaskRelations) {
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>(); if (CollectionUtils.isEmpty(processTaskRelations)) {
Map<Long, Integer> taskCodeVersionMap = new HashMap<>(); return Collections.emptyList();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) {
taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion());
}
if (processTaskRelation.getPostTaskCode() > 0) {
taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
} }
Set<TaskDefinition> taskDefinitionSet = processTaskRelations.stream()
.filter(p -> p.getPostTaskCode() > 0)
.map(p -> new TaskDefinition(p.getPostTaskCode(), p.getPostTaskVersion()))
.collect(Collectors.toSet());
if (CollectionUtils.isEmpty(taskDefinitionSet)) {
return Collections.emptyList();
} }
taskCodeVersionMap.forEach((code, version) -> { return mybatisMapper.queryByTaskDefinitions(taskDefinitionSet);
taskDefinitionLogs.add((TaskDefinitionLog) taskDefinitionDao.findTaskDefinition(code, version));
});
return taskDefinitionLogs;
} }
@Override @Override
@ -92,6 +85,6 @@ public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao {
if (CollectionUtils.isEmpty(taskDefinitionCodes)) { if (CollectionUtils.isEmpty(taskDefinitionCodes)) {
return; return;
} }
taskDefinitionLogMapper.deleteByTaskDefinitionCodes(taskDefinitionCodes); mybatisMapper.deleteByTaskDefinitionCodes(taskDefinitionCodes);
} }
} }

71
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java

@ -24,17 +24,16 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao; import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -45,38 +44,24 @@ import org.springframework.stereotype.Repository;
*/ */
@Repository @Repository
@Slf4j @Slf4j
public class TaskInstanceDaoImpl implements TaskInstanceDao { public class TaskInstanceDaoImpl extends BaseDao<TaskInstance, TaskInstanceMapper> implements TaskInstanceDao {
@Autowired
private TaskInstanceMapper taskInstanceMapper;
@Autowired @Autowired
private ProcessInstanceMapper processInstanceMapper; private ProcessInstanceMapper processInstanceMapper;
@Autowired public TaskInstanceDaoImpl(@NonNull TaskInstanceMapper taskInstanceMapper) {
private ProcessInstanceMapDao processInstanceMapDao; super(taskInstanceMapper);
}
@Override @Override
public boolean upsertTaskInstance(TaskInstance taskInstance) { public boolean upsertTaskInstance(TaskInstance taskInstance) {
if (taskInstance.getId() != null) { if (taskInstance.getId() != null) {
return updateTaskInstance(taskInstance); return updateById(taskInstance);
} else { } else {
return insertTaskInstance(taskInstance); return insert(taskInstance) > 0;
} }
} }
@Override
public boolean insertTaskInstance(TaskInstance taskInstance) {
int count = taskInstanceMapper.insert(taskInstance);
return count > 0;
}
@Override
public boolean updateTaskInstance(TaskInstance taskInstance) {
int count = taskInstanceMapper.updateById(taskInstance);
return count > 0;
}
@Override @Override
public boolean submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) { public boolean submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
WorkflowExecutionStatus processInstanceState = processInstance.getState(); WorkflowExecutionStatus processInstanceState = processInstance.getState();
@ -128,7 +113,8 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
return true; return true;
} }
List<TaskInstance> taskInstances = List<TaskInstance> taskInstances =
this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), taskInstance.getTestFlag()); this.queryValidTaskListByWorkflowInstanceId(taskInstance.getProcessInstanceId(),
taskInstance.getTestFlag());
for (TaskInstance task : taskInstances) { for (TaskInstance task : taskInstances) {
if (task.getState() == TaskExecutionStatus.FAILURE if (task.getState() == TaskExecutionStatus.FAILURE
@ -140,39 +126,34 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
} }
@Override @Override
public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag) { public List<TaskInstance> queryValidTaskListByWorkflowInstanceId(Integer processInstanceId, int testFlag) {
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag); return mybatisMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag);
} }
@Override @Override
public TaskInstance findTaskByInstanceIdAndCode(Integer processInstanceId, Long taskCode) { public TaskInstance queryByWorkflowInstanceIdAndTaskCode(Integer processInstanceId, Long taskCode) {
return taskInstanceMapper.queryByInstanceIdAndCode(processInstanceId, taskCode); return mybatisMapper.queryByInstanceIdAndCode(processInstanceId, taskCode);
} }
@Override @Override
public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId) { public List<TaskInstance> queryPreviousTaskListByWorkflowInstanceId(Integer processInstanceId) {
ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId); ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO, return mybatisMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO,
processInstance.getTestFlag()); processInstance.getTestFlag());
} }
@Override @Override
public TaskInstance findTaskInstanceById(Integer taskId) { public TaskInstance queryByCacheKey(String cacheKey) {
return taskInstanceMapper.selectById(taskId);
}
@Override
public TaskInstance findTaskInstanceByCacheKey(String cacheKey) {
if (StringUtils.isEmpty(cacheKey)) { if (StringUtils.isEmpty(cacheKey)) {
return null; return null;
} }
return taskInstanceMapper.queryByCacheKey(cacheKey); return mybatisMapper.queryByCacheKey(cacheKey);
} }
@Override @Override
public Boolean clearCacheByCacheKey(String cacheKey) { public Boolean clearCacheByCacheKey(String cacheKey) {
try { try {
taskInstanceMapper.clearCacheByCacheKey(cacheKey); mybatisMapper.clearCacheByCacheKey(cacheKey);
return true; return true;
} catch (Exception e) { } catch (Exception e) {
log.error("clear cache by cacheKey failed", e); log.error("clear cache by cacheKey failed", e);
@ -180,22 +161,14 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
} }
} }
@Override
public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
if (CollectionUtils.isEmpty(idList)) {
return new ArrayList<>();
}
return taskInstanceMapper.selectBatchIds(idList);
}
@Override @Override
public void deleteByWorkflowInstanceId(int workflowInstanceId) { public void deleteByWorkflowInstanceId(int workflowInstanceId) {
taskInstanceMapper.deleteByWorkflowInstanceId(workflowInstanceId); mybatisMapper.deleteByWorkflowInstanceId(workflowInstanceId);
} }
@Override @Override
public List<TaskInstance> findTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId) { public List<TaskInstance> queryByWorkflowInstanceId(Integer workflowInstanceId) {
return taskInstanceMapper.findByWorkflowInstanceId(workflowInstanceId); return mybatisMapper.findByWorkflowInstanceId(workflowInstanceId);
} }
} }

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandler.java

@ -73,7 +73,7 @@ public class TaskCacheEventHandler implements TaskEventHandler {
TaskInstance taskInstance = taskInstanceOptional.get(); TaskInstance taskInstance = taskInstanceOptional.get();
dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance); dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceById(taskEvent.getCacheTaskInstanceId()); TaskInstance cacheTaskInstance = taskInstanceDao.queryById(taskEvent.getCacheTaskInstanceId());
// keep the task instance fields // keep the task instance fields
cacheTaskInstance.setId(taskInstance.getId()); cacheTaskInstance.setId(taskInstance.getId());
@ -90,7 +90,7 @@ public class TaskCacheEventHandler implements TaskEventHandler {
processService.changeOutParam(taskInstance); processService.changeOutParam(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance); taskInstanceDao.updateById(taskInstance);
TaskStateEvent stateEvent = TaskStateEvent.builder() TaskStateEvent stateEvent = TaskStateEvent.builder()
.processInstanceId(taskEvent.getProcessInstanceId()) .processInstanceId(taskEvent.getProcessInstanceId())
.taskInstanceId(taskEvent.getTaskInstanceId()) .taskInstanceId(taskEvent.getTaskInstanceId())

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java

@ -88,7 +88,7 @@ public class TaskDelayEventHandler implements TaskEventHandler {
taskInstance.setExecutePath(taskEvent.getExecutePath()); taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId()); taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds()); taskInstance.setAppLink(taskEvent.getAppIds());
if (!taskInstanceDao.updateTaskInstance(taskInstance)) { if (!taskInstanceDao.updateById(taskInstance)) {
throw new TaskEventHandleError("Handle task delay event error, update taskInstance to db failed"); throw new TaskEventHandleError("Handle task delay event error, update taskInstance to db failed");
} }
sendAckToWorker(taskEvent); sendAckToWorker(taskEvent);

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java

@ -67,7 +67,7 @@ public class TaskDispatchEventHandler implements TaskEventHandler {
taskInstance.setState(TaskExecutionStatus.DISPATCH); taskInstance.setState(TaskExecutionStatus.DISPATCH);
taskInstance.setHost(taskEvent.getWorkerAddress()); taskInstance.setHost(taskEvent.getWorkerAddress());
try { try {
if (!taskInstanceDao.updateTaskInstance(taskInstance)) { if (!taskInstanceDao.updateById(taskInstance)) {
throw new TaskEventHandleError("Handle task dispatch event error, update taskInstance to db failed"); throw new TaskEventHandleError("Handle task dispatch event error, update taskInstance to db failed");
} }
} catch (Exception ex) { } catch (Exception ex) {

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java

@ -98,7 +98,7 @@ public class TaskResultEventHandler implements TaskEventHandler {
taskInstance.setEndTime(taskEvent.getEndTime()); taskInstance.setEndTime(taskEvent.getEndTime());
taskInstance.setVarPool(taskEvent.getVarPool()); taskInstance.setVarPool(taskEvent.getVarPool());
processService.changeOutParam(taskInstance); processService.changeOutParam(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance); taskInstanceDao.updateById(taskInstance);
sendAckToWorker(taskEvent); sendAckToWorker(taskEvent);
} catch (Exception ex) { } catch (Exception ex) {
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java

@ -84,7 +84,7 @@ public class TaskRunningEventHandler implements TaskEventHandler {
taskInstance.setExecutePath(taskEvent.getExecutePath()); taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId()); taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds()); taskInstance.setAppLink(taskEvent.getAppIds());
if (!taskInstanceDao.updateTaskInstance(taskInstance)) { if (!taskInstanceDao.updateById(taskInstance)) {
throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed"); throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed");
} }
sendAckToWorker(taskEvent); sendAckToWorker(taskEvent);

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java

@ -75,7 +75,7 @@ public class TaskUpdatePidEventHandler implements TaskEventHandler {
taskInstance.setStartTime(taskEvent.getStartTime()); taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress()); taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setPid(taskEvent.getProcessId()); taskInstance.setPid(taskEvent.getProcessId());
if (!taskInstanceDao.updateTaskInstance(taskInstance)) { if (!taskInstanceDao.updateById(taskInstance)) {
throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed"); throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed");
} }
sendAckToWorker(taskEvent); sendAckToWorker(taskEvent);

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java

@ -394,7 +394,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
taskInstance.setEndTime(taskEvent.getEndTime()); taskInstance.setEndTime(taskEvent.getEndTime());
taskInstance.setVarPool(taskEvent.getVarPool()); taskInstance.setVarPool(taskEvent.getVarPool());
processService.changeOutParam(taskInstance); processService.changeOutParam(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance); taskInstanceDao.updateById(taskInstance);
// send ack // send ack
sendAckToWorker(taskEvent); sendAckToWorker(taskEvent);

36
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -390,7 +390,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
TaskGroupQueue taskGroupQueue = processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId()); TaskGroupQueue taskGroupQueue = processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) { if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
log.info("Begin to force start taskGroupQueue: {}", taskGroupQueue.getId()); log.info("Begin to force start taskGroupQueue: {}", taskGroupQueue.getId());
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId()); TaskInstance taskInstance = taskInstanceDao.queryById(stateEvent.getTaskInstanceId());
DefaultTaskExecuteRunnable defaultTaskExecuteRunnable = DefaultTaskExecuteRunnable defaultTaskExecuteRunnable =
taskExecuteRunnableMap.get(taskInstance.getTaskCode()); taskExecuteRunnableMap.get(taskInstance.getTaskCode());
@ -409,7 +409,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
log.info("Begin to wake up taskGroupQueue: {}", taskGroupQueue.getId()); log.info("Begin to wake up taskGroupQueue: {}", taskGroupQueue.getId());
boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue); boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue);
if (acquireTaskGroup) { if (acquireTaskGroup) {
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId()); TaskInstance taskInstance = taskInstanceDao.queryById(stateEvent.getTaskInstanceId());
taskExecuteRunnableMap.get(taskInstance.getTaskCode()).dispatch(); taskExecuteRunnableMap.get(taskInstance.getTaskCode()).dispatch();
log.info("Success wake up taskGroupQueue: {}", taskGroupQueue.getId()); log.info("Success wake up taskGroupQueue: {}", taskGroupQueue.getId());
return true; return true;
@ -573,7 +573,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
*/ */
public void refreshTaskInstance(int taskInstanceId) { public void refreshTaskInstance(int taskInstanceId) {
log.info("task instance update: {} ", taskInstanceId); log.info("task instance update: {} ", taskInstanceId);
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId); TaskInstance taskInstance = taskInstanceDao.queryById(taskInstanceId);
if (taskInstance == null) { if (taskInstance == null) {
log.error("can not find task instance, id:{}", taskInstanceId); log.error("can not find task instance, id:{}", taskInstanceId);
return; return;
@ -837,7 +837,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
List<ProcessTaskRelation> processTaskRelations = List<ProcessTaskRelation> processTaskRelations =
processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
List<TaskDefinitionLog> taskDefinitionLogs = List<TaskDefinitionLog> taskDefinitionLogs =
taskDefinitionLogDao.getTaskDefineLogListByRelation(processTaskRelations); taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations);
List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
forbiddenTaskMap.clear(); forbiddenTaskMap.clear();
@ -880,7 +880,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
processInstance.getRunTimes(), processInstance.getRunTimes(),
processInstance.getRecovery()); processInstance.getRecovery());
List<TaskInstance> validTaskInstanceList = List<TaskInstance> validTaskInstanceList =
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(),
processInstance.getTestFlag()); processInstance.getTestFlag());
for (TaskInstance task : validTaskInstanceList) { for (TaskInstance task : validTaskInstanceList) {
try ( try (
@ -898,7 +898,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
if (!oldTaskInstance.getState().isFinished() && task.getState().isFinished()) { if (!oldTaskInstance.getState().isFinished() && task.getState().isFinished()) {
task.setFlag(Flag.NO); task.setFlag(Flag.NO);
taskInstanceDao.updateTaskInstance(task); taskInstanceDao.updateById(task);
continue; continue;
} }
} }
@ -919,7 +919,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
if (task.getState().isNeedFaultTolerance()) { if (task.getState().isNeedFaultTolerance()) {
log.info("TaskInstance needs fault tolerance, will be added to standby list."); log.info("TaskInstance needs fault tolerance, will be added to standby list.");
task.setFlag(Flag.NO); task.setFlag(Flag.NO);
taskInstanceDao.updateTaskInstance(task); taskInstanceDao.updateById(task);
// tolerantTaskInstance add to standby list directly // tolerantTaskInstance add to standby list directly
TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task); TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
@ -974,7 +974,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
processInstance.getScheduleTime(), processInstance.getScheduleTime(),
cmdParam.get(Constants.SCHEDULE_TIMEZONE)); cmdParam.get(Constants.SCHEDULE_TIMEZONE));
processInstance.setGlobalParams(globalParams); processInstance.setGlobalParams(globalParams);
processInstanceDao.updateProcessInstance(processInstance); processInstanceDao.updateById(processInstance);
} }
} }
} }
@ -1008,7 +1008,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
if (taskInstance.getId() != oldTaskInstanceId) { if (taskInstance.getId() != oldTaskInstanceId) {
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
oldTaskInstance.setFlag(Flag.NO); oldTaskInstance.setFlag(Flag.NO);
taskInstanceDao.updateTaskInstance(oldTaskInstance); taskInstanceDao.updateById(oldTaskInstance);
validTaskMap.remove(taskInstance.getTaskCode()); validTaskMap.remove(taskInstance.getTaskCode());
taskExecuteRunnableMap.remove(taskInstance.getTaskCode()); taskExecuteRunnableMap.remove(taskInstance.getTaskCode());
} }
@ -1350,7 +1350,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
existTaskInstance.setFlag(Flag.NO); existTaskInstance.setFlag(Flag.NO);
existTaskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE); existTaskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
validTaskMap.remove(existTaskInstance.getTaskCode()); validTaskMap.remove(existTaskInstance.getTaskCode());
taskInstanceDao.updateTaskInstance(existTaskInstance); taskInstanceDao.updateById(existTaskInstance);
existTaskInstance = cloneTolerantTaskInstance(existTaskInstance); existTaskInstance = cloneTolerantTaskInstance(existTaskInstance);
log.info("task {} cannot be take over will generate a tolerant task instance", log.info("task {} cannot be take over will generate a tolerant task instance",
existTaskInstance.getName()); existTaskInstance.getName());
@ -1784,7 +1784,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
processInstance.setEndTime(new Date()); processInstance.setEndTime(new Date());
} }
try { try {
processInstanceDao.updateProcessInstance(processInstance); processInstanceDao.updateById(processInstance);
} catch (Exception ex) { } catch (Exception ex) {
// recover the status // recover the status
processInstance.setStateWithDesc(originStates, "recover state by DB error"); processInstance.setStateWithDesc(originStates, "recover state by DB error");
@ -1866,7 +1866,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
try ( try (
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = final LogUtils.MDCAutoClosableContext mdcAutoClosableContext =
LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstanceId)) { LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstanceId)) {
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId); TaskInstance taskInstance = taskInstanceDao.queryById(taskInstanceId);
if (taskInstance == null || taskInstance.getState().isFinished()) { if (taskInstance == null || taskInstance.getState().isFinished()) {
continue; continue;
} }
@ -1897,7 +1897,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
while ((task = readyToSubmitTaskQueue.peek()) != null) { while ((task = readyToSubmitTaskQueue.peek()) != null) {
// stop tasks which is retrying if forced success happens // stop tasks which is retrying if forced success happens
if (task.taskCanRetry()) { if (task.taskCanRetry()) {
TaskInstance retryTask = taskInstanceDao.findTaskInstanceById(task.getId()); TaskInstance retryTask = taskInstanceDao.queryById(task.getId());
if (retryTask != null && retryTask.getState().isForceSuccess()) { if (retryTask != null && retryTask.getState().isForceSuccess()) {
task.setState(retryTask.getState()); task.setState(retryTask.getState());
log.info( log.info(
@ -1975,7 +1975,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
.map(Integer::valueOf) .map(Integer::valueOf)
.collect(Collectors.toList()); .collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(startTaskInstanceIds)) { if (CollectionUtils.isNotEmpty(startTaskInstanceIds)) {
return taskInstanceDao.findTaskInstanceByIdList(startTaskInstanceIds); return taskInstanceDao.queryByIds(startTaskInstanceIds);
} }
} }
return Collections.emptyList(); return Collections.emptyList();
@ -2137,7 +2137,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
if (validTaskMap.containsKey(taskCode)) { if (validTaskMap.containsKey(taskCode)) {
taskInstance = taskInstanceMap.get(validTaskMap.get(taskCode)); taskInstance = taskInstanceMap.get(validTaskMap.get(taskCode));
} else { } else {
taskInstance = taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskCode); taskInstance = taskInstanceDao.queryByWorkflowInstanceIdAndTaskCode(processInstance.getId(), taskCode);
} }
if (taskInstance == null) { if (taskInstance == null) {
continue; continue;
@ -2147,7 +2147,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
for (TaskInstance taskInstance : removeTaskInstances) { for (TaskInstance taskInstance : removeTaskInstances) {
taskInstance.setFlag(Flag.NO); taskInstance.setFlag(Flag.NO);
taskInstanceDao.updateTaskInstance(taskInstance); taskInstanceDao.updateById(taskInstance);
} }
Set<String> removeSet = new HashSet<>(); Set<String> removeSet = new HashSet<>();
@ -2173,7 +2173,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
.collect(Collectors.toList()); .collect(Collectors.toList());
processInstance.setVarPool(JSONUtils.toJsonString(processProperties)); processInstance.setVarPool(JSONUtils.toJsonString(processProperties));
processInstanceDao.updateProcessInstance(processInstance); processInstanceDao.updateById(processInstance);
// remove task instance from taskInstanceMap, completeTaskMap, validTaskMap, errorTaskMap // remove task instance from taskInstanceMap, completeTaskMap, validTaskMap, errorTaskMap
taskInstanceMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getValue().getTaskCode()))); taskInstanceMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getValue().getTaskCode())));
@ -2188,7 +2188,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
if (taskId.equals(taskInstance.getId())) { if (taskId.equals(taskInstance.getId())) {
taskInstance.setCacheKey(taskIdAndCacheKey.getRight()); taskInstance.setCacheKey(taskIdAndCacheKey.getRight());
try { try {
taskInstanceDao.updateTaskInstance(taskInstance); taskInstanceDao.updateById(taskInstance);
} catch (Exception e) { } catch (Exception e) {
log.error("update task instance cache key failed", e); log.error("update task instance cache key failed", e);
} }

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskKillOperator.java

@ -65,7 +65,7 @@ public class TaskKillOperator implements TaskOperator {
private void killTaskInstanceInDB(TaskInstance taskInstance) { private void killTaskInstanceInDB(TaskInstance taskInstance) {
taskInstance.setState(TaskExecutionStatus.KILL); taskInstance.setState(TaskExecutionStatus.KILL);
taskInstance.setEndTime(new Date()); taskInstance.setEndTime(new Date());
taskInstanceDao.updateTaskInstance(taskInstance); taskInstanceDao.updateById(taskInstance);
} }
private void killRemoteTaskInstanceInThreadPool(TaskInstance taskInstance) throws RemotingException { private void killRemoteTaskInstanceInThreadPool(TaskInstance taskInstance) throws RemotingException {

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java

@ -73,7 +73,7 @@ public class TaskTimeoutOperator implements TaskOperator {
private void timeoutTaskInstanceInDB(TaskInstance taskInstance) { private void timeoutTaskInstanceInDB(TaskInstance taskInstance) {
taskInstance.setState(TaskExecutionStatus.FAILURE); taskInstance.setState(TaskExecutionStatus.FAILURE);
taskInstance.setEndTime(new Date()); taskInstance.setEndTime(new Date());
taskInstanceDao.updateTaskInstance(taskInstance); taskInstanceDao.updateById(taskInstance);
} }
private void killRemoteTaskInstanceInThreadPool(TaskInstance taskInstance) throws RemotingException { private void killRemoteTaskInstanceInThreadPool(TaskInstance taskInstance) throws RemotingException {

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/blocking/BlockingLogicTask.java

@ -91,7 +91,7 @@ public class BlockingLogicTask extends BaseSyncLogicTask<BlockingParameters> {
private DependResult calculateConditionResult() throws MasterTaskExecuteException { private DependResult calculateConditionResult() throws MasterTaskExecuteException {
// todo: Directly get the task instance from the cache // todo: Directly get the task instance from the cache
Map<Long, TaskInstance> completeTaskList = taskInstanceDao Map<Long, TaskInstance> completeTaskList = taskInstanceDao
.findValidTaskListByProcessId(taskExecutionContext.getProcessInstanceId(), .queryValidTaskListByWorkflowInstanceId(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTestFlag()) taskExecutionContext.getTestFlag())
.stream() .stream()
.collect(Collectors.toMap(TaskInstance::getTaskCode, Function.identity())); .collect(Collectors.toMap(TaskInstance::getTaskCode, Function.identity()));

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java

@ -75,9 +75,10 @@ public class ConditionLogicTask extends BaseSyncLogicTask<DependentParameters> {
private DependResult calculateConditionResult() { private DependResult calculateConditionResult() {
final ProcessInstance processInstance = final ProcessInstance processInstance =
workflowInstanceDao.queryByWorkflowInstanceId(taskExecutionContext.getProcessInstanceId()); workflowInstanceDao.queryById(taskExecutionContext.getProcessInstanceId());
final List<TaskInstance> taskInstances = final List<TaskInstance> taskInstances =
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()); taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(),
processInstance.getTestFlag());
final Map<Long, TaskInstance> taskInstanceMap = final Map<Long, TaskInstance> taskInstanceMap =
taskInstances.stream().collect(Collectors.toMap(TaskInstance::getTaskCode, Function.identity())); taskInstances.stream().collect(Collectors.toMap(TaskInstance::getTaskCode, Function.identity()));

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java

@ -84,7 +84,7 @@ public class DependentAsyncTaskExecuteFunction implements AsyncTaskExecuteFuncti
this.taskDefinitionDao = taskDefinitionDao; this.taskDefinitionDao = taskDefinitionDao;
this.taskInstanceDao = taskInstanceDao; this.taskInstanceDao = taskInstanceDao;
this.processInstance = this.processInstance =
processInstanceDao.queryByWorkflowInstanceId(taskExecutionContext.getProcessInstanceId()); processInstanceDao.queryById(taskExecutionContext.getProcessInstanceId());
this.dependentDate = calculateDependentDate(); this.dependentDate = calculateDependentDate();
this.dependentTaskList = initializeDependentTaskList(); this.dependentTaskList = initializeDependentTaskList();
log.info("Initialized dependent task list successfully"); log.info("Initialized dependent task list successfully");
@ -132,7 +132,7 @@ public class DependentAsyncTaskExecuteFunction implements AsyncTaskExecuteFuncti
final Map<Long, TaskDefinition> taskDefinitionMap = taskDefinitionDao.queryByCodes(taskDefinitionCodes).stream() final Map<Long, TaskDefinition> taskDefinitionMap = taskDefinitionDao.queryByCodes(taskDefinitionCodes).stream()
.collect(Collectors.toMap(TaskDefinition::getCode, Function.identity())); .collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
final TaskInstance taskInstance = final TaskInstance taskInstance =
taskInstanceDao.findTaskInstanceById(taskExecutionContext.getTaskInstanceId()); taskInstanceDao.queryById(taskExecutionContext.getTaskInstanceId());
List<DependentExecute> dependentExecutes = dependentParameters.getDependTaskList() List<DependentExecute> dependentExecutes = dependentParameters.getDependTaskList()
.stream() .stream()
.map(dependentTaskModel -> { .map(dependentTaskModel -> {

13
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java

@ -101,9 +101,8 @@ public class DynamicLogicTask extends BaseAsyncLogicTask<DynamicParameters> {
this.processDefineMapper = processDefineMapper; this.processDefineMapper = processDefineMapper;
this.commandMapper = commandMapper; this.commandMapper = commandMapper;
this.processInstance = this.processInstance = processInstanceDao.queryById(taskExecutionContext.getProcessInstanceId());
processInstanceDao.queryByWorkflowInstanceId(taskExecutionContext.getProcessInstanceId()); this.taskInstance = taskInstanceDao.queryById(taskExecutionContext.getTaskInstanceId());
this.taskInstance = taskInstanceDao.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
} }
@Override @Override
@ -134,7 +133,7 @@ public class DynamicLogicTask extends BaseAsyncLogicTask<DynamicParameters> {
case REPEAT_RUNNING: case REPEAT_RUNNING:
existsSubProcessInstanceList.forEach(processInstance -> { existsSubProcessInstanceList.forEach(processInstance -> {
processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN); processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
processInstanceDao.updateProcessInstance(processInstance); processInstanceDao.updateById(processInstance);
}); });
break; break;
case START_FAILURE_TASK_PROCESS: case START_FAILURE_TASK_PROCESS:
@ -143,7 +142,7 @@ public class DynamicLogicTask extends BaseAsyncLogicTask<DynamicParameters> {
subWorkflowService.filterFailedProcessInstances(existsSubProcessInstanceList); subWorkflowService.filterFailedProcessInstances(existsSubProcessInstanceList);
failedProcessInstances.forEach(processInstance -> { failedProcessInstances.forEach(processInstance -> {
processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN); processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
processInstanceDao.updateProcessInstance(processInstance); processInstanceDao.updateById(processInstance);
}); });
break; break;
} }
@ -165,7 +164,7 @@ public class DynamicLogicTask extends BaseAsyncLogicTask<DynamicParameters> {
dynamicStartParams); dynamicStartParams);
ProcessInstance subProcessInstance = createSubProcessInstance(command); ProcessInstance subProcessInstance = createSubProcessInstance(command);
subProcessInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN); subProcessInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
processInstanceDao.insertProcessInstance(subProcessInstance); processInstanceDao.insert(subProcessInstance);
command.setProcessInstanceId(subProcessInstance.getId()); command.setProcessInstanceId(subProcessInstance.getId());
processInstanceList.add(subProcessInstance); processInstanceList.add(subProcessInstance);
} }
@ -273,7 +272,7 @@ public class DynamicLogicTask extends BaseAsyncLogicTask<DynamicParameters> {
subWorkflowService.filterRunningProcessInstances(existsSubProcessInstanceList); subWorkflowService.filterRunningProcessInstances(existsSubProcessInstanceList);
for (ProcessInstance subProcessInstance : runningSubProcessInstanceList) { for (ProcessInstance subProcessInstance : runningSubProcessInstanceList) {
subProcessInstance.setState(stopStatus); subProcessInstance.setState(stopStatus);
processInstanceDao.updateProcessInstance(subProcessInstance); processInstanceDao.updateById(subProcessInstance);
if (subProcessInstance.getState().isFinished()) { if (subProcessInstance.getState().isFinished()) {
log.info("The process instance [{}] is finished, no need to stop", subProcessInstance.getId()); log.info("The process instance [{}] is finished, no need to stop", subProcessInstance.getId());
return; return;

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowAsyncTaskExecuteFunction.java

@ -46,14 +46,14 @@ public class SubWorkflowAsyncTaskExecuteFunction implements AsyncTaskExecuteFunc
public @NonNull AsyncTaskExecutionStatus getAsyncTaskExecutionStatus() { public @NonNull AsyncTaskExecutionStatus getAsyncTaskExecutionStatus() {
// query the status of sub workflow instance // query the status of sub workflow instance
if (subWorkflowInstance == null) { if (subWorkflowInstance == null) {
subWorkflowInstance = processInstanceDao.findSubProcessInstanceByParentId( subWorkflowInstance = processInstanceDao.querySubProcessInstanceByParentId(
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
} }
if (subWorkflowInstance == null) { if (subWorkflowInstance == null) {
log.info("The sub workflow instance doesn't created"); log.info("The sub workflow instance doesn't created");
return AsyncTaskExecutionStatus.RUNNING; return AsyncTaskExecutionStatus.RUNNING;
} }
subWorkflowInstance = processInstanceDao.queryByWorkflowInstanceId(subWorkflowInstance.getId()); subWorkflowInstance = processInstanceDao.queryById(subWorkflowInstance.getId());
if (subWorkflowInstance != null && subWorkflowInstance.getState().isFinished()) { if (subWorkflowInstance != null && subWorkflowInstance.getState().isFinished()) {
return subWorkflowInstance.getState().isSuccess() ? AsyncTaskExecutionStatus.SUCCESS return subWorkflowInstance.getState().isSuccess() ? AsyncTaskExecutionStatus.SUCCESS
: AsyncTaskExecutionStatus.FAILED; : AsyncTaskExecutionStatus.FAILED;

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java

@ -73,7 +73,7 @@ public class SubWorkflowLogicTask extends BaseAsyncLogicTask<SubProcessParameter
return; return;
} }
ProcessInstance subProcessInstance = ProcessInstance subProcessInstance =
processInstanceDao.findSubProcessInstanceByParentId(taskExecutionContext.getProcessInstanceId(), processInstanceDao.querySubProcessInstanceByParentId(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()); taskExecutionContext.getTaskInstanceId());
if (subProcessInstance == null) { if (subProcessInstance == null) {
log.info("SubWorkflow instance is null"); log.info("SubWorkflow instance is null");
@ -91,7 +91,7 @@ public class SubWorkflowLogicTask extends BaseAsyncLogicTask<SubProcessParameter
return; return;
} }
subProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_PAUSE, "ready pause sub workflow"); subProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_PAUSE, "ready pause sub workflow");
processInstanceDao.updateProcessInstance(subProcessInstance); processInstanceDao.updateById(subProcessInstance);
try { try {
sendToSubProcess(taskExecutionContext, subProcessInstance); sendToSubProcess(taskExecutionContext, subProcessInstance);
log.info("Success send pause request to SubWorkflow's master: {}", subProcessInstance.getHost()); log.info("Success send pause request to SubWorkflow's master: {}", subProcessInstance.getHost());
@ -110,7 +110,7 @@ public class SubWorkflowLogicTask extends BaseAsyncLogicTask<SubProcessParameter
return; return;
} }
ProcessInstance subProcessInstance = ProcessInstance subProcessInstance =
processInstanceDao.findSubProcessInstanceByParentId(taskExecutionContext.getProcessInstanceId(), processInstanceDao.querySubProcessInstanceByParentId(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()); taskExecutionContext.getTaskInstanceId());
if (subProcessInstance == null) { if (subProcessInstance == null) {
log.info("SubWorkflow instance is null"); log.info("SubWorkflow instance is null");
@ -128,7 +128,7 @@ public class SubWorkflowLogicTask extends BaseAsyncLogicTask<SubProcessParameter
return; return;
} }
subProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, "ready stop by kill task"); subProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, "ready stop by kill task");
processInstanceDao.updateProcessInstance(subProcessInstance); processInstanceDao.updateById(subProcessInstance);
try { try {
sendToSubProcess(taskExecutionContext, subProcessInstance); sendToSubProcess(taskExecutionContext, subProcessInstance);
log.info("Success send kill request to SubWorkflow's master: {}", subProcessInstance.getHost()); log.info("Success send kill request to SubWorkflow's master: {}", subProcessInstance.getHost());

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java

@ -156,7 +156,7 @@ public class DependentExecute {
DependResult result; DependResult result;
TaskInstance taskInstance = null; TaskInstance taskInstance = null;
List<TaskInstance> taskInstanceList = List<TaskInstance> taskInstanceList =
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), testFlag); taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), testFlag);
for (TaskInstance task : taskInstanceList) { for (TaskInstance task : taskInstanceList) {
if (task.getTaskCode() == taskCode) { if (task.getTaskCode() == taskCode) {
@ -192,10 +192,10 @@ public class DependentExecute {
private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) { private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
ProcessInstance lastSchedulerProcess = ProcessInstance lastSchedulerProcess =
processInstanceDao.findLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag); processInstanceDao.queryLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag);
ProcessInstance lastManualProcess = ProcessInstance lastManualProcess =
processInstanceDao.findLastManualProcessInterval(definitionCode, dateInterval, testFlag); processInstanceDao.queryLastManualProcessInterval(definitionCode, dateInterval, testFlag);
if (lastManualProcess == null) { if (lastManualProcess == null) {
return lastSchedulerProcess; return lastSchedulerProcess;

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandlerTest.java

@ -80,7 +80,7 @@ class TaskCacheEventHandlerTest {
cacheTaskInstance.setProcessInstanceId(cacheProcessInstanceId); cacheTaskInstance.setProcessInstanceId(cacheProcessInstanceId);
cacheTaskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>())); cacheTaskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
Mockito.when(taskInstanceDao.findTaskInstanceById(cacheTaskInstanceId)).thenReturn(cacheTaskInstance); Mockito.when(taskInstanceDao.queryById(cacheTaskInstanceId)).thenReturn(cacheTaskInstance);
WorkflowExecuteRunnable workflowExecuteRunnable = Mockito.mock(WorkflowExecuteRunnable.class); WorkflowExecuteRunnable workflowExecuteRunnable = Mockito.mock(WorkflowExecuteRunnable.class);
Mockito.when(processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId)) Mockito.when(processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId))

8
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java

@ -161,7 +161,7 @@ public class WorkflowExecuteRunnableTest {
taskInstance4.setId(4); taskInstance4.setId(4);
Map<String, String> cmdParam = new HashMap<>(); Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4"); cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4");
Mockito.when(taskInstanceDao.findTaskInstanceByIdList( Mockito.when(taskInstanceDao.queryByIds(
Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(),
taskInstance4.getId()))) taskInstance4.getId())))
.thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4)); .thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
@ -308,9 +308,11 @@ public class WorkflowExecuteRunnableTest {
dagField.setAccessible(true); dagField.setAccessible(true);
dagField.set(workflowExecuteThread, dag); dagField.set(workflowExecuteThread, dag);
Mockito.when(taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskInstance1.getTaskCode())) Mockito.when(taskInstanceDao.queryByWorkflowInstanceIdAndTaskCode(processInstance.getId(),
taskInstance1.getTaskCode()))
.thenReturn(taskInstance1); .thenReturn(taskInstance1);
Mockito.when(taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskInstance2.getTaskCode())) Mockito.when(taskInstanceDao.queryByWorkflowInstanceIdAndTaskCode(processInstance.getId(),
taskInstance2.getTaskCode()))
.thenReturn(null); .thenReturn(null);
workflowExecuteThread.clearDataIfExecuteTask(); workflowExecuteThread.clearDataIfExecuteTask();

6
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskTest.java

@ -87,7 +87,7 @@ class DynamicLogicTaskTest {
taskExecutionContext = Mockito.mock(TaskExecutionContext.class); taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
objectMapper = new ObjectMapper(); objectMapper = new ObjectMapper();
processInstance = new ProcessInstance(); processInstance = new ProcessInstance();
Mockito.when(processInstanceDao.queryByWorkflowInstanceId(Mockito.any())).thenReturn(processInstance); Mockito.when(processInstanceDao.queryById(Mockito.any())).thenReturn(processInstance);
dynamicLogicTask = new DynamicLogicTask( dynamicLogicTask = new DynamicLogicTask(
taskExecutionContext, taskExecutionContext,
processInstanceDao, processInstanceDao,
@ -163,7 +163,7 @@ class DynamicLogicTaskTest {
dynamicLogicTask.resetProcessInstanceStatus(subProcessInstances); dynamicLogicTask.resetProcessInstanceStatus(subProcessInstances);
Mockito.verify(processInstanceDao).updateProcessInstance(subProcessInstance); Mockito.verify(processInstanceDao).updateById(subProcessInstance);
Assertions.assertEquals(WorkflowExecutionStatus.WAIT_TO_RUN, subProcessInstance.getState()); Assertions.assertEquals(WorkflowExecutionStatus.WAIT_TO_RUN, subProcessInstance.getState());
} }
@ -178,7 +178,7 @@ class DynamicLogicTaskTest {
dynamicLogicTask.resetProcessInstanceStatus(subProcessInstances); dynamicLogicTask.resetProcessInstanceStatus(subProcessInstances);
Mockito.verify(processInstanceDao).updateProcessInstance(failedSubProcessInstance); Mockito.verify(processInstanceDao).updateById(failedSubProcessInstance);
Assertions.assertEquals(WorkflowExecutionStatus.WAIT_TO_RUN, failedSubProcessInstance.getState()); Assertions.assertEquals(WorkflowExecutionStatus.WAIT_TO_RUN, failedSubProcessInstance.getState());
} }

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

@ -154,7 +154,7 @@ public class FailoverServiceTest {
given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())) given(processService.queryNeedFailoverProcessInstances(Mockito.anyString()))
.willReturn(Arrays.asList(processInstance)); .willReturn(Arrays.asList(processInstance));
doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class)); doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
given(taskInstanceDao.findValidTaskListByProcessId(Mockito.anyInt(), Mockito.anyInt())) given(taskInstanceDao.queryValidTaskListByWorkflowInstanceId(Mockito.anyInt(), Mockito.anyInt()))
.willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance)); .willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance));
Thread.sleep(1000); Thread.sleep(1000);

55
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -386,9 +386,9 @@ public class ProcessServiceImpl implements ProcessService {
info.setCommandType(CommandType.STOP); info.setCommandType(CommandType.STOP);
info.addHistoryCmd(CommandType.STOP); info.addHistoryCmd(CommandType.STOP);
info.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, "ready stop by serial_priority strategy"); info.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, "ready stop by serial_priority strategy");
int update = processInstanceDao.updateProcessInstance(info); boolean update = processInstanceDao.updateById(info);
// determine whether the process is normal // determine whether the process is normal
if (update > 0) { if (update) {
WorkflowStateEventChangeRequest workflowStateEventChangeRequest = WorkflowStateEventChangeRequest workflowStateEventChangeRequest =
new WorkflowStateEventChangeRequest( new WorkflowStateEventChangeRequest(
info.getId(), 0, info.getState(), info.getId(), 0); info.getId(), 0, info.getState(), info.getId(), 0);
@ -506,7 +506,7 @@ public class ProcessServiceImpl implements ProcessService {
*/ */
@Override @Override
public void removeTaskLogFile(Integer processInstanceId) { public void removeTaskLogFile(Integer processInstanceId) {
List<TaskInstance> taskInstanceList = taskInstanceDao.findTaskInstanceByWorkflowInstanceId(processInstanceId); List<TaskInstance> taskInstanceList = taskInstanceDao.queryByWorkflowInstanceId(processInstanceId);
if (CollectionUtils.isEmpty(taskInstanceList)) { if (CollectionUtils.isEmpty(taskInstanceList)) {
return; return;
} }
@ -832,7 +832,7 @@ public class ProcessServiceImpl implements ProcessService {
failedList.addAll(killedList); failedList.addAll(killedList);
failedList.addAll(toleranceList); failedList.addAll(toleranceList);
for (Integer taskId : failedList) { for (Integer taskId : failedList) {
initTaskInstance(taskInstanceDao.findTaskInstanceById(taskId)); initTaskInstance(taskInstanceDao.queryById(taskId));
} }
cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING, cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA, convertIntListToString(failedList))); String.join(Constants.COMMA, convertIntListToString(failedList)));
@ -850,7 +850,7 @@ public class ProcessServiceImpl implements ProcessService {
TaskExecutionStatus.KILL); TaskExecutionStatus.KILL);
for (Integer taskId : stopNodeList) { for (Integer taskId : stopNodeList) {
// initialize the pause state // initialize the pause state
initTaskInstance(taskInstanceDao.findTaskInstanceById(taskId)); initTaskInstance(taskInstanceDao.queryById(taskId));
} }
cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING, cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA, convertIntListToString(stopNodeList))); String.join(Constants.COMMA, convertIntListToString(stopNodeList)));
@ -867,11 +867,11 @@ public class ProcessServiceImpl implements ProcessService {
// delete all the valid tasks when complement data if id is not null // delete all the valid tasks when complement data if id is not null
if (processInstance.getId() != null) { if (processInstance.getId() != null) {
List<TaskInstance> taskInstanceList = List<TaskInstance> taskInstanceList =
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(),
processInstance.getTestFlag()); processInstance.getTestFlag());
for (TaskInstance taskInstance : taskInstanceList) { for (TaskInstance taskInstance : taskInstanceList) {
taskInstance.setFlag(Flag.NO); taskInstance.setFlag(Flag.NO);
taskInstanceDao.updateTaskInstance(taskInstance); taskInstanceDao.updateById(taskInstance);
} }
} }
break; break;
@ -889,11 +889,11 @@ public class ProcessServiceImpl implements ProcessService {
} }
// delete all the valid tasks when repeat running // delete all the valid tasks when repeat running
List<TaskInstance> validTaskList = List<TaskInstance> validTaskList =
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(),
processInstance.getTestFlag()); processInstance.getTestFlag());
for (TaskInstance taskInstance : validTaskList) { for (TaskInstance taskInstance : validTaskList) {
taskInstance.setFlag(Flag.NO); taskInstance.setFlag(Flag.NO);
taskInstanceDao.updateTaskInstance(taskInstance); taskInstanceDao.updateById(taskInstance);
} }
processInstance.setStartTime(new Date()); processInstance.setStartTime(new Date());
processInstance.setRestartTime(processInstance.getStartTime()); processInstance.setRestartTime(processInstance.getStartTime());
@ -1050,7 +1050,7 @@ public class ProcessServiceImpl implements ProcessService {
// update sub process id to process map table // update sub process id to process map table
processInstanceMap.setProcessInstanceId(subProcessInstance.getId()); processInstanceMap.setProcessInstanceId(subProcessInstance.getId());
processInstanceMapDao.updateWorkProcessInstanceMap(processInstanceMap); processInstanceMapDao.updateById(processInstanceMap);
} }
/** /**
@ -1112,11 +1112,11 @@ public class ProcessServiceImpl implements ProcessService {
if (!taskInstance.isSubProcess() if (!taskInstance.isSubProcess()
&& (taskInstance.getState().isKill() || taskInstance.getState().isFailure())) { && (taskInstance.getState().isKill() || taskInstance.getState().isFailure())) {
taskInstance.setFlag(Flag.NO); taskInstance.setFlag(Flag.NO);
taskInstanceDao.updateTaskInstance(taskInstance); taskInstanceDao.updateById(taskInstance);
return; return;
} }
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS); taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
taskInstanceDao.updateTaskInstance(taskInstance); taskInstanceDao.updateById(taskInstance);
} }
/** /**
@ -1207,7 +1207,7 @@ public class ProcessServiceImpl implements ProcessService {
processMap = findPreviousTaskProcessMap(parentInstance, parentTask); processMap = findPreviousTaskProcessMap(parentInstance, parentTask);
if (processMap != null) { if (processMap != null) {
processMap.setParentTaskInstanceId(parentTask.getId()); processMap.setParentTaskInstanceId(parentTask.getId());
processInstanceMapDao.updateWorkProcessInstanceMap(processMap); processInstanceMapDao.updateById(processMap);
return processMap; return processMap;
} }
} }
@ -1215,7 +1215,7 @@ public class ProcessServiceImpl implements ProcessService {
processMap = new ProcessInstanceMap(); processMap = new ProcessInstanceMap();
processMap.setParentProcessInstanceId(parentInstance.getId()); processMap.setParentProcessInstanceId(parentInstance.getId());
processMap.setParentTaskInstanceId(parentTask.getId()); processMap.setParentTaskInstanceId(parentTask.getId());
processInstanceMapDao.createWorkProcessInstanceMap(processMap); processInstanceMapDao.insert(processMap);
return processMap; return processMap;
} }
@ -1231,12 +1231,12 @@ public class ProcessServiceImpl implements ProcessService {
Integer preTaskId = 0; Integer preTaskId = 0;
List<TaskInstance> preTaskList = List<TaskInstance> preTaskList =
taskInstanceDao.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId()); taskInstanceDao.queryPreviousTaskListByWorkflowInstanceId(parentProcessInstance.getId());
for (TaskInstance task : preTaskList) { for (TaskInstance task : preTaskList) {
if (task.getName().equals(parentTask.getName())) { if (task.getName().equals(parentTask.getName())) {
preTaskId = task.getId(); preTaskId = task.getId();
ProcessInstanceMap map = ProcessInstanceMap map =
processInstanceMapDao.findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId); processInstanceMapDao.queryWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
if (map != null) { if (map != null) {
return map; return map;
} }
@ -1260,7 +1260,7 @@ public class ProcessServiceImpl implements ProcessService {
} }
// check create sub work flow firstly // check create sub work flow firstly
ProcessInstanceMap instanceMap = ProcessInstanceMap instanceMap =
processInstanceMapDao.findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId()); processInstanceMapDao.queryWorkProcessMapByParent(parentProcessInstance.getId(), task.getId());
if (null != instanceMap if (null != instanceMap
&& CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) { && CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) {
// recover failover tolerance would not create a new command when the sub command already have been created // recover failover tolerance would not create a new command when the sub command already have been created
@ -1295,7 +1295,7 @@ public class ProcessServiceImpl implements ProcessService {
private void initSubInstanceState(ProcessInstance childInstance) { private void initSubInstanceState(ProcessInstance childInstance) {
if (childInstance != null) { if (childInstance != null) {
childInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "init sub workflow instance"); childInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "init sub workflow instance");
processInstanceDao.updateProcessInstance(childInstance); processInstanceDao.updateById(childInstance);
} }
} }
@ -2140,10 +2140,11 @@ public class ProcessServiceImpl implements ProcessService {
@Override @Override
public DagData genDagData(ProcessDefinition processDefinition) { public DagData genDagData(ProcessDefinition processDefinition) {
List<ProcessTaskRelation> taskRelations = List<ProcessTaskRelation> taskRelations =
this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
List<TaskDefinitionLog> taskDefinitionLogList = taskDefinitionLogDao.getTaskDefineLogList(taskRelations); List<TaskDefinition> taskDefinitions = taskDefinitionLogDao.queryTaskDefineLogList(taskRelations)
List<TaskDefinition> taskDefinitions = .stream()
taskDefinitionLogList.stream().map(t -> (TaskDefinition) t).collect(Collectors.toList()); .map(t -> (TaskDefinition) t)
.collect(Collectors.toList());
return new DagData(processDefinition, taskRelations, taskDefinitions); return new DagData(processDefinition, taskRelations, taskDefinitions);
} }
@ -2190,7 +2191,7 @@ public class ProcessServiceImpl implements ProcessService {
}); });
} }
if (CollectionUtils.isEmpty(taskDefinitionLogs)) { if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
taskDefinitionLogs = taskDefinitionLogDao.getTaskDefineLogList(taskRelationList); taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(taskRelationList);
} }
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream() Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
@ -2246,7 +2247,7 @@ public class ProcessServiceImpl implements ProcessService {
return processTaskMap; return processTaskMap;
} }
ProcessInstance fatherProcess = this.findProcessInstanceById(processInstanceMap.getParentProcessInstanceId()); ProcessInstance fatherProcess = this.findProcessInstanceById(processInstanceMap.getParentProcessInstanceId());
TaskInstance fatherTask = taskInstanceDao.findTaskInstanceById(processInstanceMap.getParentTaskInstanceId()); TaskInstance fatherTask = taskInstanceDao.queryById(processInstanceMap.getParentTaskInstanceId());
if (fatherProcess != null) { if (fatherProcess != null) {
processTaskMap.put(fatherProcess, fatherTask); processTaskMap.put(fatherProcess, fatherTask);
@ -2581,13 +2582,13 @@ public class ProcessServiceImpl implements ProcessService {
if (processInstance != null if (processInstance != null
&& (processInstance.getState().isFailure() || processInstance.getState().isStop())) { && (processInstance.getState().isFailure() || processInstance.getState().isStop())) {
List<TaskInstance> validTaskList = List<TaskInstance> validTaskList =
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(),
processInstance.getTestFlag()); processInstance.getTestFlag());
List<Long> instanceTaskCodeList = List<Long> instanceTaskCodeList =
validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList()); validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList());
List<ProcessTaskRelation> taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(), List<ProcessTaskRelation> taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion()); processInstance.getProcessDefinitionVersion());
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.getTaskDefineLogList(taskRelations); List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(taskRelations);
List<Long> definiteTaskCodeList = List<Long> definiteTaskCodeList =
taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES) taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES)
.map(TaskDefinitionLog::getCode).collect(Collectors.toList()); .map(TaskDefinitionLog::getCode).collect(Collectors.toList());
@ -2599,7 +2600,7 @@ public class ProcessServiceImpl implements ProcessService {
.map(TaskInstance::getId).collect(Collectors.toList()); .map(TaskInstance::getId).collect(Collectors.toList());
if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) { if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.SUCCESS, "success by task force success"); processInstance.setStateWithDesc(WorkflowExecutionStatus.SUCCESS, "success by task force success");
processInstanceDao.updateProcessInstance(processInstance); processInstanceDao.updateById(processInstance);
} }
} }
} }

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java

@ -55,7 +55,7 @@ public class SubWorkflowServiceImpl implements SubWorkflowService {
List<Long> allSubProcessInstanceId = relationSubWorkflows.stream() List<Long> allSubProcessInstanceId = relationSubWorkflows.stream()
.map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(Collectors.toList()); .map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(Collectors.toList());
List<ProcessInstance> allSubProcessInstance = processInstanceDao.queryBatchIds(allSubProcessInstanceId); List<ProcessInstance> allSubProcessInstance = processInstanceDao.queryByIds(allSubProcessInstanceId);
allSubProcessInstance.sort(Comparator.comparing(ProcessInstance::getId)); allSubProcessInstance.sort(Comparator.comparing(ProcessInstance::getId));
return allSubProcessInstance; return allSubProcessInstance;
} }

2
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -700,7 +700,7 @@ public class ProcessServiceTest {
taskDefinitionLogs.add(taskDefinition); taskDefinitionLogs.add(taskDefinition);
taskDefinitionLogs.add(td2); taskDefinitionLogs.add(td2);
Mockito.when(taskDefinitionLogDao.getTaskDefineLogList(any())).thenReturn(taskDefinitionLogs); Mockito.when(taskDefinitionLogDao.queryTaskDefineLogList(any())).thenReturn(taskDefinitionLogs);
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt())) Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(list); .thenReturn(list);

Loading…
Cancel
Save