diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java index 0020e4df77..4d52cc22aa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java @@ -53,7 +53,7 @@ public class PauseExecuteFunction implements ExecuteFunction 0) { + if (processInstanceDao.updateById(workflowInstance)) { log.info("Workflow instance {} ready to stop success, will call master to stop the workflow instance", workflowInstance.getName()); // todo: Use specific stop command instead of WorkflowStateEventChangeCommand diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index cd57d92166..a707beab0f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -108,7 +108,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -406,9 +405,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ ApiFuncIdentificationConstant.map.get(executeType)); checkMasterExists(); - ProcessInstance workflowInstance = - Optional.ofNullable(processInstanceDao.queryByWorkflowInstanceId(processInstanceId)) - .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId)); + ProcessInstance workflowInstance = processInstanceDao.queryOptionalById(processInstanceId) + .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId)); checkState(workflowInstance.getProjectCode() == projectCode, "The workflow instance's project code doesn't equals to the given project"); @@ -637,10 +635,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ processInstance.setCommandType(commandType); processInstance.addHistoryCmd(commandType); processInstance.setStateWithDesc(executionStatus, commandType.getDescp() + "by ui"); - int update = processInstanceDao.updateProcessInstance(processInstance); + boolean update = processInstanceDao.updateById(processInstance); // determine whether the process is normal - if (update > 0) { + if (update) { log.info("Process instance state is updated to {} in database, processInstanceName:{}.", executionStatus.getDesc(), processInstance.getName()); // directly send the process instance state change event to target master, not guarantee the event send diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java index 263f25be42..ab55a8e1c5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java @@ -92,7 +92,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService @SuppressWarnings("unchecked") public Result queryLog(User loginUser, int taskInstId, int skipLineNum, int limit) { - TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstId); + TaskInstance taskInstance = taskInstanceDao.queryById(taskInstId); if (taskInstance == null) { log.error("Task instance does not exist, taskInstanceId:{}.", taskInstId); @@ -120,7 +120,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService */ @Override public byte[] getLogBytes(User loginUser, int taskInstId) { - TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstId); + TaskInstance taskInstance = taskInstanceDao.queryById(taskInstId); if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { throw new ServiceException("task instance is null or host is null"); } @@ -149,7 +149,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService return result; } // check whether the task instance can be found - TaskInstance task = taskInstanceDao.findTaskInstanceById(taskInstId); + TaskInstance task = taskInstanceDao.queryById(taskInstId); if (task == null || StringUtils.isBlank(task.getHost())) { putMsg(result, Status.TASK_INSTANCE_NOT_FOUND); return result; @@ -182,7 +182,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService throw new ServiceException("user has no permission"); } // check whether the task instance can be found - TaskInstance task = taskInstanceDao.findTaskInstanceById(taskInstId); + TaskInstance task = taskInstanceDao.queryById(taskInstId); if (task == null || StringUtils.isBlank(task.getHost())) { throw new ServiceException("task instance is null or host is null"); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 4e27ea8efb..0d45e7dd35 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -697,7 +697,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro int workflowDefinitionVersion) { ProcessDefinition workflowDefinition = processDefinitionDao.queryByCode(workflowDefinitionCode).orElse(null); if (workflowDefinition == null || workflowDefinition.getVersion() != workflowDefinitionVersion) { - workflowDefinition = processDefinitionLogDao.queryProcessDefinitionLog(workflowDefinitionCode, + workflowDefinition = processDefinitionLogDao.queryByDefinitionCodeAndVersion(workflowDefinitionCode, workflowDefinitionVersion); } return Optional.ofNullable(workflowDefinition); @@ -1856,15 +1856,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro List processDefinitions = processDefinitionMapper .queryDefinitionListByProjectCodeAndProcessDefinitionCodes(projectCode, definitionCodesSet); - // query process task relation - List processTaskRelations = - processTaskRelationMapper.queryProcessTaskRelationsByProcessDefinitionCode( - processDefinitions.get(0).getCode(), - processDefinitions.get(0).getVersion()); - // query task definition log - List taskDefinitionLogsList = - taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations); + List taskDefinitionLogsList = taskDefinitionLogDao.queryByWorkflowDefinitionCodeAndVersion( + processDefinitions.get(0).getCode(), processDefinitions.get(0).getVersion()); List taskDefinitionList = new ArrayList<>(); for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogsList) { @@ -1914,8 +1908,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro List processInstanceList = processInstanceService.queryByProcessDefineCode(code, limit); processInstanceList.forEach(processInstance -> processInstance .setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()))); - List taskDefinitionList = taskDefinitionLogDao.getTaskDefineLogList(processTaskRelationMapper - .queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode())); + List taskDefinitionList = taskDefinitionLogDao.queryByWorkflowDefinitionCodeAndVersion( + processDefinition.getCode(), processDefinition.getVersion()); Map taskDefinitionMap = taskDefinitionList.stream() .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); @@ -2153,7 +2147,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro if (isCopy) { log.info("Copy process definition..."); List taskDefinitionLogs = - taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations); + taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations); Map taskCodeMap = new HashMap<>(); for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { try { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 1df2ff4ce4..f9fe36dd0d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -283,7 +283,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce @Override public ProcessInstance queryByWorkflowInstanceIdThrowExceptionIfNotFound(Integer workflowInstanceId) { - ProcessInstance processInstance = processInstanceDao.queryByWorkflowInstanceId(workflowInstanceId); + ProcessInstance processInstance = processInstanceDao.queryById(workflowInstanceId); if (processInstance == null) { throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, workflowInstanceId); } @@ -485,7 +485,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } List taskInstanceList = - taskInstanceDao.findValidTaskListByProcessId(processId, processInstance.getTestFlag()); + taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processId, processInstance.getTestFlag()); addDependResultForTaskList(loginUser, taskInstanceList); Map resultMap = new HashMap<>(); resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString()); @@ -498,7 +498,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce @Override public List queryDynamicSubWorkflowInstances(User loginUser, Integer taskId) { - TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskId); + TaskInstance taskInstance = taskInstanceDao.queryById(taskId); Map result = new HashMap<>(); if (taskInstance == null) { putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId); @@ -516,11 +516,11 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId); } List relationSubWorkflows = relationSubWorkflowMapper - .queryAllSubProcessInstance(Long.valueOf(taskInstance.getProcessInstanceId()), + .queryAllSubProcessInstance((long) taskInstance.getProcessInstanceId(), taskInstance.getTaskCode()); List allSubProcessInstanceId = relationSubWorkflows.stream() .map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(java.util.stream.Collectors.toList()); - List allSubWorkflows = processInstanceDao.queryBatchIds(allSubProcessInstanceId); + List allSubWorkflows = processInstanceDao.queryByIds(allSubProcessInstanceId); if (allSubWorkflows == null || allSubWorkflows.isEmpty()) { putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId); @@ -626,7 +626,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } - TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskId); + TaskInstance taskInstance = taskInstanceDao.queryById(taskId); if (taskInstance == null) { log.error("Task instance does not exist, projectCode:{}, taskInstanceId{}.", projectCode, taskId); putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId); @@ -786,8 +786,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } processInstance.setProcessDefinitionVersion(insertVersion); - int update = processInstanceDao.updateProcessInstance(processInstance); - if (update == 0) { + boolean update = processInstanceDao.updateById(processInstance); + if (!update) { log.error( "Update process instance version error, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}", projectCode, processDefinition.getCode(), insertVersion); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionLogServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionLogServiceImpl.java index 44c57c07f0..aa557bb3fd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionLogServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionLogServiceImpl.java @@ -44,7 +44,7 @@ public class TaskDefinitionLogServiceImpl implements TaskDefinitionLogService { @Override public void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode) { List processTaskRelations = - processTaskRelationLogDao.findByWorkflowDefinitionCode(workflowDefinitionCode); + processTaskRelationLogDao.queryByWorkflowDefinitionCode(workflowDefinitionCode); if (CollectionUtils.isEmpty(processTaskRelations)) { return; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index f3d4f0e6e6..7551e3a1fa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -371,7 +371,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst @Override public void deleteByWorkflowInstanceId(Integer workflowInstanceId) { List needToDeleteTaskInstances = - taskInstanceDao.findTaskInstanceByWorkflowInstanceId(workflowInstanceId); + taskInstanceDao.queryByWorkflowInstanceId(workflowInstanceId); if (org.apache.commons.collections4.CollectionUtils.isEmpty(needToDeleteTaskInstances)) { return; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java index 7c6a2981cc..eba1f34e17 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java @@ -530,7 +530,7 @@ public class ExecuteFunctionServiceTest { when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN)) .thenReturn(checkProjectAndAuth()); - when(processInstanceDao.queryByWorkflowInstanceId(processInstanceId)).thenReturn(processInstance); + when(processInstanceDao.queryOptionalById(processInstanceId)).thenReturn(Optional.of(processInstance)); when(processDefinitionService.queryWorkflowDefinitionThrowExceptionIfNotFound(processDefinitionCode, processDefinitionVersion)).thenReturn(processDefinition); Map result = diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java index 000a5955a4..51b9faf55c 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java @@ -85,7 +85,7 @@ public class LoggerServiceTest { loginUser.setId(1); TaskInstance taskInstance = new TaskInstance(); taskInstance.setExecutorId(loginUser.getId() + 1); - Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); Result result = loggerService.queryLog(loginUser, 2, 1, 1); // TASK_INSTANCE_NOT_FOUND Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue()); @@ -123,7 +123,7 @@ public class LoggerServiceTest { // SUCCESS Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, project, VIEW_LOG); - Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); result = loggerService.queryLog(loginUser, 1, 1, 1); Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); } @@ -135,7 +135,7 @@ public class LoggerServiceTest { loginUser.setId(1); TaskInstance taskInstance = new TaskInstance(); taskInstance.setExecutorId(loginUser.getId() + 1); - Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); // task instance is null try { @@ -198,7 +198,7 @@ public class LoggerServiceTest { Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, projectCode); TaskInstance taskInstance = new TaskInstance(); - Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); TaskDefinition taskDefinition = new TaskDefinition(); taskDefinition.setProjectCode(projectCode); taskDefinition.setCode(1L); @@ -208,7 +208,7 @@ public class LoggerServiceTest { taskInstance.setHost("127.0.0.1:8080"); taskInstance.setLogPath("/temp/log"); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, VIEW_LOG)).thenReturn(result); - Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1); Assertions.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); @@ -236,7 +236,7 @@ public class LoggerServiceTest { taskInstance.setLogPath("/temp/log"); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, DOWNLOAD_LOG)) .thenReturn(result); - Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString())) .thenReturn(new byte[0]); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 8f6d4c78fe..3cdd21f195 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -27,7 +27,6 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE; import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.times; @@ -801,9 +800,6 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { Mockito.when(projectMapper.queryByCode(1)).thenReturn(project1); Mockito.when(projectService.checkProjectAndAuth(user, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); - Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode)) - .thenReturn(getProcessTaskRelation()); - Mockito.when(taskDefinitionLogDao.getTaskDefineLogList(any())).thenReturn(new ArrayList<>()); Map taskNotNuLLRes = processDefinitionService.viewTree(user, processDefinition.getProjectCode(), 46, 10); Assertions.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 905b8aabaf..88f1bc99e5 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -454,8 +454,9 @@ public class ProcessInstanceServiceTest { when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); when(processService.findProcessInstanceDetailById(processInstance.getId())) .thenReturn(Optional.of(processInstance)); - when(taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag())) - .thenReturn(taskInstanceList); + when(taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), + processInstance.getTestFlag())) + .thenReturn(taskInstanceList); when(loggerService.queryLog(loginUser, taskInstance.getId(), 0, 4098)).thenReturn(res); Map successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1); Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); @@ -497,7 +498,7 @@ public class ProcessInstanceServiceTest { putMsg(result, Status.SUCCESS, projectCode); when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); - when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(null); + when(taskInstanceDao.queryById(1)).thenReturn(null); Map taskNullRes = processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1); Assertions.assertEquals(Status.TASK_INSTANCE_NOT_EXISTS, taskNullRes.get(Constants.STATUS)); @@ -507,7 +508,7 @@ public class ProcessInstanceServiceTest { taskInstance.setTaskType("HTTP"); taskInstance.setProcessInstanceId(1); putMsg(result, Status.SUCCESS, projectCode); - when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); + when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); TaskDefinition taskDefinition = new TaskDefinition(); taskDefinition.setProjectCode(projectCode); when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); @@ -527,7 +528,7 @@ public class ProcessInstanceServiceTest { subTask.setTaskType("SUB_PROCESS"); subTask.setProcessInstanceId(1); putMsg(result, Status.SUCCESS, projectCode); - when(taskInstanceDao.findTaskInstanceById(subTask.getId())).thenReturn(subTask); + when(taskInstanceDao.queryById(subTask.getId())).thenReturn(subTask); when(processService.findSubProcessInstance(subTask.getProcessInstanceId(), subTask.getId())).thenReturn(null); Map subprocessNotExistRes = processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1); @@ -593,7 +594,7 @@ public class ProcessInstanceServiceTest { when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant); when(processService.getTenantForProcess(Mockito.anyString(), Mockito.anyInt())) .thenReturn(tenant.getTenantCode()); - when(processInstanceDao.updateProcessInstance(processInstance)).thenReturn(1); + when(processInstanceDao.updateById(processInstance)).thenReturn(true); when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.FALSE)).thenReturn(1); List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index 7a032191cb..761e556f17 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -395,8 +395,8 @@ public class TaskInstanceServiceTest { when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(taskInstanceMapper.selectById(1)).thenReturn(task); - when(taskInstanceDao.findTaskInstanceByCacheKey(cacheKey)).thenReturn(task, null); - when(taskInstanceDao.updateTaskInstance(task)).thenReturn(true); + when(taskInstanceDao.queryByCacheKey(cacheKey)).thenReturn(task, null); + when(taskInstanceDao.updateById(task)).thenReturn(true); TaskInstanceRemoveCacheResponse response = taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java index 3f3f2de981..6454e076e6 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java @@ -74,6 +74,7 @@ public interface TaskDefinitionLogMapper extends BaseMapper { Integer queryMaxVersionForDefinition(@Param("code") long code); /** + * todo: rename to query by code and version * @param taskDefinitions taskDefinition list * @return list */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java new file mode 100644 index 0000000000..05bf7f59fd --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.commons.collections4.CollectionUtils; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import lombok.NonNull; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +public abstract class BaseDao> implements IDao { + + protected MYBATIS_MAPPER mybatisMapper; + + public BaseDao(@NonNull MYBATIS_MAPPER mybatisMapper) { + this.mybatisMapper = mybatisMapper; + } + + @Override + public ENTITY queryById(@NonNull Serializable id) { + return mybatisMapper.selectById(id); + } + + @Override + public Optional queryOptionalById(@NonNull Serializable id) { + return Optional.ofNullable(queryById(id)); + } + + @Override + public List queryByIds(Collection ids) { + if (CollectionUtils.isEmpty(ids)) { + return Collections.emptyList(); + } + return mybatisMapper.selectBatchIds(ids); + } + + @Override + public int insert(@NonNull ENTITY model) { + return mybatisMapper.insert(model); + } + + @Override + public void insertBatch(Collection models) { + if (CollectionUtils.isEmpty(models)) { + return; + } + for (ENTITY model : models) { + insert(model); + } + } + + @Override + public boolean updateById(@NonNull ENTITY model) { + return mybatisMapper.updateById(model) > 0; + } + + @Override + public boolean deleteById(@NonNull Serializable id) { + return mybatisMapper.deleteById(id) > 0; + } + + @Override + public boolean deleteByIds(Collection ids) { + if (CollectionUtils.isEmpty(ids)) { + return true; + } + return mybatisMapper.deleteBatchIds(ids) > 0; + } + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/DqExecuteResultDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/DqExecuteResultDao.java index 843548c866..c13cf7f085 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/DqExecuteResultDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/DqExecuteResultDao.java @@ -17,7 +17,9 @@ package org.apache.dolphinscheduler.dao.repository; -public interface DqExecuteResultDao { +import org.apache.dolphinscheduler.dao.entity.DqExecuteResult; + +public interface DqExecuteResultDao extends IDao { void deleteByWorkflowInstanceId(Integer workflowInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java new file mode 100644 index 0000000000..9b2e0b7e60 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +import lombok.NonNull; + +public interface IDao { + + /** + * Query the entity by primary key. + */ + Entity queryById(@NonNull Serializable id); + + /** + * Same with {@link #queryById(Serializable)} but return {@link Optional} instead of null. + */ + Optional queryOptionalById(@NonNull Serializable id); + + /** + * Query the entity by primary keys. + */ + List queryByIds(Collection ids); + + /** + * Insert the entity. + */ + int insert(@NonNull Entity model); + + /** + * Insert the entities. + */ + void insertBatch(Collection models); + + /** + * Update the entity by primary key. + */ + boolean updateById(@NonNull Entity model); + + /** + * Delete the entity by primary key. + */ + boolean deleteById(@NonNull Serializable id); + + /** + * Delete the entities by primary keys. + */ + boolean deleteByIds(Collection ids); + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java index febb009446..27b68196af 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.dao.repository; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.model.PageListingResult; import java.util.Collection; @@ -27,7 +26,7 @@ import java.util.Optional; import javax.annotation.Nullable; -public interface ProcessDefinitionDao { +public interface ProcessDefinitionDao extends IDao { /** * Listing the process definition belongs to the given userId and projectCode. @@ -42,17 +41,8 @@ public interface ProcessDefinitionDao { int userId, long projectCode); - /** - * query process definitions by definition codes and versions - * @param processInstances process instances where codes and version come from - * @return - */ - List queryProcessDefinitionsByCodesAndVersions(List processInstances); - Optional queryByCode(long code); - void deleteById(Integer workflowDefinitionId); - void deleteByWorkflowDefinitionCode(long workflowDefinitionCode); List queryByCodes(Collection processDefinitionCodes); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java index 4f32d142cd..6d7ecce4ea 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java @@ -19,9 +19,9 @@ package org.apache.dolphinscheduler.dao.repository; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; -public interface ProcessDefinitionLogDao { +public interface ProcessDefinitionLogDao extends IDao { - ProcessDefinitionLog queryProcessDefinitionLog(long workflowDefinitionCode, int workflowDefinitionVersion); + ProcessDefinitionLog queryByDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion); void deleteByWorkflowDefinitionCode(long workflowDefinitionCode); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java index 32e9f97133..91a29eacba 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java @@ -22,29 +22,14 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; import org.apache.ibatis.annotations.Param; -import java.util.Date; -import java.util.List; - -public interface ProcessInstanceDao { - - public int insertProcessInstance(ProcessInstance processInstance); - - public int updateProcessInstance(ProcessInstance processInstance); +public interface ProcessInstanceDao extends IDao { /** * insert or update work process instance to database * * @param processInstance processInstance */ - public int upsertProcessInstance(ProcessInstance processInstance); - - List queryBatchIds(List processInstanceIds); - - void deleteByIds(List needToDeleteWorkflowInstanceIds); - - void deleteById(Integer workflowInstanceId); - - ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId); + void upsertProcessInstance(ProcessInstance processInstance); /** * find last scheduler process instance in the date interval @@ -53,7 +38,7 @@ public interface ProcessInstanceDao { * @param dateInterval dateInterval * @return process instance */ - ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag); + ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag); /** * find last manual process instance interval @@ -62,17 +47,7 @@ public interface ProcessInstanceDao { * @param dateInterval dateInterval * @return process instance */ - ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag); - - /** - * find last running process instance - * - * @param definitionCode process definition code - * @param startTime start time - * @param endTime end time - * @return process instance - */ - ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag); + ProcessInstance queryLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag); /** * query first schedule process instance @@ -90,5 +65,5 @@ public interface ProcessInstanceDao { */ ProcessInstance queryFirstStartProcessInstance(@Param("processDefinitionCode") Long definitionCode); - ProcessInstance findSubProcessInstanceByParentId(Integer processInstanceId, Integer taskInstanceId); + ProcessInstance querySubProcessInstanceByParentId(Integer processInstanceId, Integer taskInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java index 7f74eaf313..2116ed3bbd 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java @@ -24,21 +24,7 @@ import java.util.List; /** * Process Instance Map DAO */ -public interface ProcessInstanceMapDao { - - /** - * Update process instance map - * @param processInstanceMap process instance map - * @return result - */ - int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap); - - /** - * Create process instance map to DB. - * @param processInstanceMap process instance map - * @return result - */ - int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap); +public interface ProcessInstanceMapDao extends IDao { /** * find work process map by parent process id and parent task id. @@ -46,7 +32,7 @@ public interface ProcessInstanceMapDao { * @param parentTaskId parentTaskId * @return process instance map */ - ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId); + ProcessInstanceMap queryWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId); List querySubWorkflowInstanceIds(int workflowInstanceId); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java index d835cdd3ac..8bd12026f0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java @@ -23,9 +23,9 @@ import org.apache.ibatis.annotations.Param; import java.util.List; -public interface ProcessTaskRelationLogDao { +public interface ProcessTaskRelationLogDao extends IDao { - List findByWorkflowDefinitionCode(long workflowDefinitionCode); + List queryByWorkflowDefinitionCode(long workflowDefinitionCode); void deleteByWorkflowDefinitionCode(long workflowDefinitionCode); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProjectDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProjectDao.java index 4069cf7a2d..a03f509fb5 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProjectDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProjectDao.java @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.dao.entity.Project; import java.util.Collection; import java.util.List; -public interface ProjectDao { +public interface ProjectDao extends IDao { List queryByCodes(Collection projectCodes); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java index 2145b18fbd..931a009041 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java @@ -26,10 +26,11 @@ import java.util.Set; /** * Task Instance DAO */ -public interface TaskDefinitionDao { +public interface TaskDefinitionDao extends IDao { /** * Get list of task definition by process definition code + * * @param processDefinitionCode process definition code * @return list of task definition */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java index b942196c32..131987938d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java @@ -26,21 +26,12 @@ import java.util.Set; /** * Task Definition Log DAO */ -public interface TaskDefinitionLogDao { +public interface TaskDefinitionLogDao extends IDao { - /** - * Get task definition log list - * @param processTaskRelations list of process task relation - * @return list of task definition - */ - List getTaskDefineLogList(List processTaskRelations); + List queryByWorkflowDefinitionCodeAndVersion(Long workflowDefinitionCode, + Integer workflowDefinitionVersion); - /** - * Query task definition log list by process task relation list - * @param processTaskRelations list of task relation - * @return list of task definition log - */ - List getTaskDefineLogListByRelation(List processTaskRelations); + List queryTaskDefineLogList(List processTaskRelations); void deleteByTaskDefinitionCodes(Set taskDefinitionCodes); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java index 65d391d4e1..4a6d1435ff 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java @@ -25,31 +25,18 @@ import java.util.List; /** * Task Instance DAO */ -public interface TaskInstanceDao { +public interface TaskInstanceDao extends IDao { /** * Update or Insert task instance to DB. * ID is null -> Insert * ID is not null -> Update + * * @param taskInstance task instance * @return result */ boolean upsertTaskInstance(TaskInstance taskInstance); - /** - * Insert task instance to DB. - * @param taskInstance task instance - * @return result - */ - boolean insertTaskInstance(TaskInstance taskInstance); - - /** - * Update task instance to DB. - * @param taskInstance task instance - * @return result - */ - boolean updateTaskInstance(TaskInstance taskInstance); - /** * Submit a task instance to DB. * @param taskInstance task instance @@ -64,7 +51,7 @@ public interface TaskInstanceDao { * @param testFlag test flag * @return list of valid task instance */ - List findValidTaskListByProcessId(Integer processInstanceId, int testFlag); + List queryValidTaskListByWorkflowInstanceId(Integer processInstanceId, int testFlag); /** * Query list of task instance by process instance id and task code @@ -72,28 +59,21 @@ public interface TaskInstanceDao { * @param taskCode task code * @return list of valid task instance */ - TaskInstance findTaskByInstanceIdAndCode(Integer processInstanceId, Long taskCode); + TaskInstance queryByWorkflowInstanceIdAndTaskCode(Integer processInstanceId, Long taskCode); /** * find previous task list by work process id * @param processInstanceId processInstanceId * @return task instance list */ - List findPreviousTaskListByWorkProcessId(Integer processInstanceId); - - /** - * find task instance by id - * @param taskId task id - * @return task instance - */ - TaskInstance findTaskInstanceById(Integer taskId); + List queryPreviousTaskListByWorkflowInstanceId(Integer processInstanceId); /** * find task instance by cache_key * @param cacheKey cache key * @return task instance */ - TaskInstance findTaskInstanceByCacheKey(String cacheKey); + TaskInstance queryByCacheKey(String cacheKey); /** * clear task instance cache by cache_key @@ -102,14 +82,7 @@ public interface TaskInstanceDao { */ Boolean clearCacheByCacheKey(String cacheKey); - /** - * find task instance list by id list - * @param idList task id list - * @return task instance list - */ - List findTaskInstanceByIdList(List idList); - void deleteByWorkflowInstanceId(int workflowInstanceId); - List findTaskInstanceByWorkflowInstanceId(Integer processInstanceId); + List queryByWorkflowInstanceId(Integer processInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/DqExecuteResultDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/DqExecuteResultDaoImpl.java index f496d92f88..0ca8408669 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/DqExecuteResultDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/DqExecuteResultDaoImpl.java @@ -17,20 +17,26 @@ package org.apache.dolphinscheduler.dao.repository.impl; +import org.apache.dolphinscheduler.dao.entity.DqExecuteResult; import org.apache.dolphinscheduler.dao.mapper.DqExecuteResultMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; import org.apache.dolphinscheduler.dao.repository.DqExecuteResultDao; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.NonNull; + import org.springframework.stereotype.Repository; @Repository -public class DqExecuteResultDaoImpl implements DqExecuteResultDao { +public class DqExecuteResultDaoImpl extends BaseDao + implements + DqExecuteResultDao { - @Autowired - private DqExecuteResultMapper dqExecuteResultMapper; + public DqExecuteResultDaoImpl(@NonNull DqExecuteResultMapper dqExecuteResultMapper) { + super(dqExecuteResultMapper); + } @Override public void deleteByWorkflowInstanceId(Integer workflowInstanceId) { - dqExecuteResultMapper.deleteByWorkflowInstanceId(workflowInstanceId); + mybatisMapper.deleteByWorkflowInstanceId(workflowInstanceId); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java index bf29c814af..7e4617ba5c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java @@ -18,43 +18,40 @@ package org.apache.dolphinscheduler.dao.repository.impl; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.model.PageListingResult; +import org.apache.dolphinscheduler.dao.repository.BaseDao; import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; import org.apache.commons.collections4.CollectionUtils; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Optional; -import java.util.stream.Collectors; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.NonNull; + import org.springframework.stereotype.Repository; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @Repository -public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao { +public class ProcessDefinitionDaoImpl extends BaseDao + implements + ProcessDefinitionDao { - @Autowired - private ProcessDefinitionMapper processDefinitionMapper; - @Autowired - private ProcessDefinitionLogMapper processDefinitionLogMapper; + public ProcessDefinitionDaoImpl(@NonNull ProcessDefinitionMapper processDefinitionMapper) { + super(processDefinitionMapper); + } @Override public PageListingResult listingProcessDefinition(int pageNumber, int pageSize, String searchVal, int userId, long projectCode) { Page page = new Page<>(pageNumber, pageSize); IPage processDefinitions = - processDefinitionMapper.queryDefineListPaging(page, searchVal, userId, projectCode); + mybatisMapper.queryDefineListPaging(page, searchVal, userId, projectCode); return PageListingResult.builder() .totalCount(processDefinitions.getTotal()) @@ -64,41 +61,14 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao { .build(); } - @Override - public List queryProcessDefinitionsByCodesAndVersions(List processInstances) { - if (Objects.isNull(processInstances) || processInstances.isEmpty()) { - return new ArrayList<>(); - } - List processDefinitionLogs = processInstances - .parallelStream() - .map(processInstance -> { - ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper - .queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); - return processDefinitionLog; - }) - .collect(Collectors.toList()); - - List processDefinitions = - processDefinitionLogs.stream().map(log -> (ProcessDefinition) log).collect(Collectors.toList()); - - return processDefinitions; - } - @Override public Optional queryByCode(long code) { - return Optional.ofNullable( - processDefinitionMapper.queryByCode(code)); - } - - @Override - public void deleteById(Integer workflowDefinitionId) { - processDefinitionMapper.deleteById(workflowDefinitionId); + return Optional.ofNullable(mybatisMapper.queryByCode(code)); } @Override public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) { - processDefinitionMapper.deleteByCode(workflowDefinitionCode); + mybatisMapper.deleteByCode(workflowDefinitionCode); } @Override @@ -106,6 +76,6 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao { if (CollectionUtils.isEmpty(processDefinitionCodes)) { return Collections.emptyList(); } - return processDefinitionMapper.queryByCodes(processDefinitionCodes); + return mybatisMapper.queryByCodes(processDefinitionCodes); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java index 0f757193fb..1d21d714db 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java @@ -19,25 +19,30 @@ package org.apache.dolphinscheduler.dao.repository.impl; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.NonNull; + import org.springframework.stereotype.Repository; @Repository -public class ProcessDefinitionLogDaoImpl implements ProcessDefinitionLogDao { +public class ProcessDefinitionLogDaoImpl extends BaseDao + implements + ProcessDefinitionLogDao { - @Autowired - private ProcessDefinitionLogMapper processDefinitionLogMapper; + public ProcessDefinitionLogDaoImpl(@NonNull ProcessDefinitionLogMapper processDefinitionLogMapper) { + super(processDefinitionLogMapper); + } @Override - public ProcessDefinitionLog queryProcessDefinitionLog(long workflowDefinitionCode, int workflowDefinitionVersion) { - return processDefinitionLogMapper.queryByDefinitionCodeAndVersion(workflowDefinitionCode, - workflowDefinitionVersion); + public ProcessDefinitionLog queryByDefinitionCodeAndVersion(long workflowDefinitionCode, + int workflowDefinitionVersion) { + return mybatisMapper.queryByDefinitionCodeAndVersion(workflowDefinitionCode, workflowDefinitionVersion); } @Override public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) { - processDefinitionLogMapper.deleteByProcessDefinitionCode(workflowDefinitionCode); + mybatisMapper.deleteByProcessDefinitionCode(workflowDefinitionCode); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java index 1d4775cb29..688556c2da 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java @@ -17,20 +17,14 @@ package org.apache.dolphinscheduler.dao.repository.impl; -import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; -import org.apache.commons.collections4.CollectionUtils; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -39,57 +33,24 @@ import org.springframework.stereotype.Repository; @Slf4j @Repository -public class ProcessInstanceDaoImpl implements ProcessInstanceDao { - - @Autowired - private ProcessInstanceMapper processInstanceMapper; +public class ProcessInstanceDaoImpl extends BaseDao + implements + ProcessInstanceDao { @Autowired private ProcessInstanceMapMapper processInstanceMapMapper; - @Override - public int insertProcessInstance(ProcessInstance processInstance) { - return processInstanceMapper.insert(processInstance); - } - - @Override - public int updateProcessInstance(ProcessInstance processInstance) { - return processInstanceMapper.updateById(processInstance); + public ProcessInstanceDaoImpl(@NonNull ProcessInstanceMapper processInstanceMapper) { + super(processInstanceMapper); } @Override - public int upsertProcessInstance(@NonNull ProcessInstance processInstance) { + public void upsertProcessInstance(@NonNull ProcessInstance processInstance) { if (processInstance.getId() != null) { - return updateProcessInstance(processInstance); + updateById(processInstance); } else { - return insertProcessInstance(processInstance); - } - } - - @Override - public List queryBatchIds(List processInstanceIds) { - if (CollectionUtils.isEmpty(processInstanceIds)) { - return new ArrayList<>(); - } - return processInstanceMapper.selectBatchIds(processInstanceIds); - } - - @Override - public void deleteByIds(List needToDeleteWorkflowInstanceIds) { - if (CollectionUtils.isEmpty(needToDeleteWorkflowInstanceIds)) { - return; + insert(processInstance); } - processInstanceMapper.deleteBatchIds(needToDeleteWorkflowInstanceIds); - } - - @Override - public void deleteById(Integer workflowInstanceId) { - processInstanceMapper.deleteById(workflowInstanceId); - } - - @Override - public ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId) { - return processInstanceMapper.selectById(workflowInstanceId); } /** @@ -100,9 +61,9 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao { * @return process instance */ @Override - public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, - int testFlag) { - return processInstanceMapper.queryLastSchedulerProcess(definitionCode, + public ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, + int testFlag) { + return mybatisMapper.queryLastSchedulerProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime(), testFlag); @@ -116,30 +77,14 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao { * @return process instance */ @Override - public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) { - return processInstanceMapper.queryLastManualProcess(definitionCode, + public ProcessInstance queryLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, + int testFlag) { + return mybatisMapper.queryLastManualProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime(), testFlag); } - /** - * find last running process instance - * - * @param definitionCode process definition code - * @param startTime start time - * @param endTime end time - * @return process instance - */ - @Override - public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag) { - return processInstanceMapper.queryLastRunningProcess(definitionCode, - startTime, - endTime, - testFlag, - WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState()); - } - /** * query first schedule process instance * @@ -148,7 +93,7 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao { */ @Override public ProcessInstance queryFirstScheduleProcessInstance(Long definitionCode) { - return processInstanceMapper.queryFirstScheduleProcessInstance(definitionCode); + return mybatisMapper.queryFirstScheduleProcessInstance(definitionCode); } /** @@ -159,18 +104,18 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao { */ @Override public ProcessInstance queryFirstStartProcessInstance(Long definitionCode) { - return processInstanceMapper.queryFirstStartProcessInstance(definitionCode); + return mybatisMapper.queryFirstStartProcessInstance(definitionCode); } @Override - public ProcessInstance findSubProcessInstanceByParentId(Integer processInstanceId, Integer taskInstanceId) { + public ProcessInstance querySubProcessInstanceByParentId(Integer processInstanceId, Integer taskInstanceId) { ProcessInstance processInstance = null; ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryByParentId(processInstanceId, taskInstanceId); if (processInstanceMap == null || processInstanceMap.getProcessInstanceId() == 0) { return processInstance; } - processInstance = queryByWorkflowInstanceId(processInstanceMap.getProcessInstanceId()); + processInstance = queryById(processInstanceMap.getProcessInstanceId()); return processInstance; } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java index 7d30c81ded..aa0931acfb 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java @@ -19,46 +19,39 @@ package org.apache.dolphinscheduler.dao.repository.impl; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao; import java.util.List; import lombok.NonNull; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; /** * Process Instance Map Dao implementation */ @Repository -public class ProcessInstanceMapDaoImpl implements ProcessInstanceMapDao { +public class ProcessInstanceMapDaoImpl extends BaseDao + implements + ProcessInstanceMapDao { - @Autowired - private ProcessInstanceMapMapper processInstanceMapMapper; - - @Override - public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) { - return processInstanceMapMapper.updateById(processInstanceMap); - } - - @Override - public int createWorkProcessInstanceMap(@NonNull ProcessInstanceMap processInstanceMap) { - return processInstanceMapMapper.insert(processInstanceMap); + public ProcessInstanceMapDaoImpl(@NonNull ProcessInstanceMapMapper processInstanceMapMapper) { + super(processInstanceMapMapper); } @Override - public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) { - return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId); + public ProcessInstanceMap queryWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) { + return mybatisMapper.queryByParentId(parentWorkProcessId, parentTaskId); } @Override public List querySubWorkflowInstanceIds(int workflowInstanceId) { - return processInstanceMapMapper.querySubIdListByParentId(workflowInstanceId); + return mybatisMapper.querySubIdListByParentId(workflowInstanceId); } @Override public void deleteByParentId(int workflowInstanceId) { - processInstanceMapMapper.deleteByParentId(workflowInstanceId); + mybatisMapper.deleteByParentId(workflowInstanceId); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java index 223fa5efbc..81c29b89da 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java @@ -19,31 +19,41 @@ package org.apache.dolphinscheduler.dao.repository.impl; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao; +import org.apache.commons.collections4.CollectionUtils; + import java.util.List; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.NonNull; + import org.springframework.stereotype.Repository; @Repository -public class ProcessTaskRelationLogDaoImpl implements ProcessTaskRelationLogDao { +public class ProcessTaskRelationLogDaoImpl extends BaseDao + implements + ProcessTaskRelationLogDao { - @Autowired - private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + public ProcessTaskRelationLogDaoImpl(@NonNull ProcessTaskRelationLogMapper processTaskRelationLogMapper) { + super(processTaskRelationLogMapper); + } @Override - public List findByWorkflowDefinitionCode(long workflowDefinitionCode) { - return processTaskRelationLogMapper.queryByProcessCode(workflowDefinitionCode); + public List queryByWorkflowDefinitionCode(long workflowDefinitionCode) { + return mybatisMapper.queryByProcessCode(workflowDefinitionCode); } @Override public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) { - processTaskRelationLogMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode); + mybatisMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode); } @Override public int batchInsert(List taskRelationList) { - return processTaskRelationLogMapper.batchInsert(taskRelationList); + if (CollectionUtils.isEmpty(taskRelationList)) { + return 0; + } + return mybatisMapper.batchInsert(taskRelationList); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProjectDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProjectDaoImpl.java index 1db9d7fb08..91fbed782f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProjectDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProjectDaoImpl.java @@ -19,22 +19,25 @@ package org.apache.dolphinscheduler.dao.repository.impl; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; import org.apache.dolphinscheduler.dao.repository.ProjectDao; import java.util.Collection; import java.util.List; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.NonNull; + import org.springframework.stereotype.Repository; @Repository -public class ProjectDaoImpl implements ProjectDao { +public class ProjectDaoImpl extends BaseDao implements ProjectDao { - @Autowired - private ProjectMapper projectMapper; + public ProjectDaoImpl(@NonNull ProjectMapper projectMapper) { + super(projectMapper); + } @Override public List queryByCodes(Collection projectCodes) { - return projectMapper.queryByCodes(projectCodes); + return mybatisMapper.queryByCodes(projectCodes); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java index 8327ccff65..1672a456d1 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; import org.apache.commons.collections4.CollectionUtils; @@ -32,10 +33,11 @@ import org.apache.commons.collections4.CollectionUtils; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -48,7 +50,7 @@ import com.google.common.collect.Lists; */ @Repository @Slf4j -public class TaskDefinitionDaoImpl implements TaskDefinitionDao { +public class TaskDefinitionDaoImpl extends BaseDao implements TaskDefinitionDao { @Autowired private ProcessDefinitionMapper processDefinitionMapper; @@ -59,8 +61,9 @@ public class TaskDefinitionDaoImpl implements TaskDefinitionDao { @Autowired private TaskDefinitionLogMapper taskDefinitionLogMapper; - @Autowired - private TaskDefinitionMapper taskDefinitionMapper; + public TaskDefinitionDaoImpl(@NonNull TaskDefinitionMapper taskDefinitionMapper) { + super(taskDefinitionMapper); + } @Override public List getTaskDefinitionListByDefinition(long processDefinitionCode) { @@ -70,11 +73,13 @@ public class TaskDefinitionDaoImpl implements TaskDefinitionDao { return Lists.newArrayList(); } - List processTaskRelations = processTaskRelationLogMapper - .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); - Set taskDefinitionSet = new HashSet<>(); - processTaskRelations.stream().filter(p -> p.getPostTaskCode() > 0) - .forEach(p -> taskDefinitionSet.add(new TaskDefinition(p.getPostTaskCode(), p.getPostTaskVersion()))); + List processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion( + processDefinition.getCode(), processDefinition.getVersion()); + Set taskDefinitionSet = processTaskRelations + .stream() + .filter(p -> p.getPostTaskCode() > 0) + .map(p -> new TaskDefinition(p.getPostTaskCode(), p.getPostTaskVersion())) + .collect(Collectors.toSet()); if (taskDefinitionSet.isEmpty()) { return Lists.newArrayList(); @@ -90,8 +95,7 @@ public class TaskDefinitionDaoImpl implements TaskDefinitionDao { @Override public void deleteByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion) { - taskDefinitionMapper.deleteByWorkflowDefinitionCodeAndVersion(workflowDefinitionCode, - workflowDefinitionVersion); + mybatisMapper.deleteByWorkflowDefinitionCodeAndVersion(workflowDefinitionCode, workflowDefinitionVersion); } @Override @@ -99,7 +103,7 @@ public class TaskDefinitionDaoImpl implements TaskDefinitionDao { if (CollectionUtils.isEmpty(needToDeleteTaskDefinitionCodes)) { return; } - taskDefinitionMapper.deleteByBatchCodes(new ArrayList<>(needToDeleteTaskDefinitionCodes)); + mybatisMapper.deleteByBatchCodes(new ArrayList<>(needToDeleteTaskDefinitionCodes)); } @Override @@ -107,7 +111,7 @@ public class TaskDefinitionDaoImpl implements TaskDefinitionDao { if (CollectionUtils.isEmpty(taskDefinitionCodes)) { return Collections.emptyList(); } - return taskDefinitionMapper.queryByCodeList(taskDefinitionCodes); + return mybatisMapper.queryByCodeList(taskDefinitionCodes); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java index e1e27f5c7e..dd1c726107 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java @@ -20,71 +20,64 @@ package org.apache.dolphinscheduler.dao.repository.impl; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; -import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.BaseDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.commons.collections4.CollectionUtils; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; +import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; + +import lombok.NonNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; -import com.google.common.collect.Lists; - /** * Task Definition Log DAP implementation */ @Repository -public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao { +public class TaskDefinitionLogDaoImpl extends BaseDao + implements + TaskDefinitionLogDao { @Autowired - private TaskDefinitionDao taskDefinitionDao; + private ProcessTaskRelationLogMapper processTaskRelationLogMapper; - @Autowired - private TaskDefinitionLogMapper taskDefinitionLogMapper; + public TaskDefinitionLogDaoImpl(@NonNull TaskDefinitionLogMapper taskDefinitionLogMapper) { + super(taskDefinitionLogMapper); + } @Override - public List getTaskDefineLogList(List processTaskRelations) { - Set taskDefinitionSet = new HashSet<>(); - for (ProcessTaskRelation processTaskRelation : processTaskRelations) { - if (processTaskRelation.getPreTaskCode() > 0) { - taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(), - processTaskRelation.getPreTaskVersion())); - } - if (processTaskRelation.getPostTaskCode() > 0) { - taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), - processTaskRelation.getPostTaskVersion())); - } - } - if (taskDefinitionSet.isEmpty()) { - return Lists.newArrayList(); - } - return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet); + public List queryByWorkflowDefinitionCodeAndVersion(Long workflowDefinitionCode, + Integer workflowDefinitionVersion) { + + List processTaskRelationLogs = processTaskRelationLogMapper + .queryByProcessCodeAndVersion(workflowDefinitionCode, workflowDefinitionVersion) + .stream() + .map(p -> (ProcessTaskRelation) p) + .collect(Collectors.toList()); + return queryTaskDefineLogList(processTaskRelationLogs); } @Override - public List getTaskDefineLogListByRelation(List processTaskRelations) { - List taskDefinitionLogs = new ArrayList<>(); - Map taskCodeVersionMap = new HashMap<>(); - for (ProcessTaskRelation processTaskRelation : processTaskRelations) { - if (processTaskRelation.getPreTaskCode() > 0) { - taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion()); - } - if (processTaskRelation.getPostTaskCode() > 0) { - taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()); - } + public List queryTaskDefineLogList(List processTaskRelations) { + if (CollectionUtils.isEmpty(processTaskRelations)) { + return Collections.emptyList(); + } + Set taskDefinitionSet = processTaskRelations.stream() + .filter(p -> p.getPostTaskCode() > 0) + .map(p -> new TaskDefinition(p.getPostTaskCode(), p.getPostTaskVersion())) + .collect(Collectors.toSet()); + + if (CollectionUtils.isEmpty(taskDefinitionSet)) { + return Collections.emptyList(); } - taskCodeVersionMap.forEach((code, version) -> { - taskDefinitionLogs.add((TaskDefinitionLog) taskDefinitionDao.findTaskDefinition(code, version)); - }); - return taskDefinitionLogs; + return mybatisMapper.queryByTaskDefinitions(taskDefinitionSet); } @Override @@ -92,6 +85,6 @@ public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao { if (CollectionUtils.isEmpty(taskDefinitionCodes)) { return; } - taskDefinitionLogMapper.deleteByTaskDefinitionCodes(taskDefinitionCodes); + mybatisMapper.deleteByTaskDefinitionCodes(taskDefinitionCodes); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java index 6bed7ed900..479247dbb9 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java @@ -24,17 +24,16 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; -import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao; +import org.apache.dolphinscheduler.dao.repository.BaseDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; -import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import java.util.ArrayList; import java.util.Date; import java.util.List; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -45,38 +44,24 @@ import org.springframework.stereotype.Repository; */ @Repository @Slf4j -public class TaskInstanceDaoImpl implements TaskInstanceDao { - - @Autowired - private TaskInstanceMapper taskInstanceMapper; +public class TaskInstanceDaoImpl extends BaseDao implements TaskInstanceDao { @Autowired private ProcessInstanceMapper processInstanceMapper; - @Autowired - private ProcessInstanceMapDao processInstanceMapDao; + public TaskInstanceDaoImpl(@NonNull TaskInstanceMapper taskInstanceMapper) { + super(taskInstanceMapper); + } @Override public boolean upsertTaskInstance(TaskInstance taskInstance) { if (taskInstance.getId() != null) { - return updateTaskInstance(taskInstance); + return updateById(taskInstance); } else { - return insertTaskInstance(taskInstance); + return insert(taskInstance) > 0; } } - @Override - public boolean insertTaskInstance(TaskInstance taskInstance) { - int count = taskInstanceMapper.insert(taskInstance); - return count > 0; - } - - @Override - public boolean updateTaskInstance(TaskInstance taskInstance) { - int count = taskInstanceMapper.updateById(taskInstance); - return count > 0; - } - @Override public boolean submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) { WorkflowExecutionStatus processInstanceState = processInstance.getState(); @@ -128,7 +113,8 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao { return true; } List taskInstances = - this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), taskInstance.getTestFlag()); + this.queryValidTaskListByWorkflowInstanceId(taskInstance.getProcessInstanceId(), + taskInstance.getTestFlag()); for (TaskInstance task : taskInstances) { if (task.getState() == TaskExecutionStatus.FAILURE @@ -140,39 +126,34 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao { } @Override - public List findValidTaskListByProcessId(Integer processInstanceId, int testFlag) { - return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag); + public List queryValidTaskListByWorkflowInstanceId(Integer processInstanceId, int testFlag) { + return mybatisMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag); } @Override - public TaskInstance findTaskByInstanceIdAndCode(Integer processInstanceId, Long taskCode) { - return taskInstanceMapper.queryByInstanceIdAndCode(processInstanceId, taskCode); + public TaskInstance queryByWorkflowInstanceIdAndTaskCode(Integer processInstanceId, Long taskCode) { + return mybatisMapper.queryByInstanceIdAndCode(processInstanceId, taskCode); } @Override - public List findPreviousTaskListByWorkProcessId(Integer processInstanceId) { + public List queryPreviousTaskListByWorkflowInstanceId(Integer processInstanceId) { ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId); - return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO, + return mybatisMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO, processInstance.getTestFlag()); } @Override - public TaskInstance findTaskInstanceById(Integer taskId) { - return taskInstanceMapper.selectById(taskId); - } - - @Override - public TaskInstance findTaskInstanceByCacheKey(String cacheKey) { + public TaskInstance queryByCacheKey(String cacheKey) { if (StringUtils.isEmpty(cacheKey)) { return null; } - return taskInstanceMapper.queryByCacheKey(cacheKey); + return mybatisMapper.queryByCacheKey(cacheKey); } @Override public Boolean clearCacheByCacheKey(String cacheKey) { try { - taskInstanceMapper.clearCacheByCacheKey(cacheKey); + mybatisMapper.clearCacheByCacheKey(cacheKey); return true; } catch (Exception e) { log.error("clear cache by cacheKey failed", e); @@ -180,22 +161,14 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao { } } - @Override - public List findTaskInstanceByIdList(List idList) { - if (CollectionUtils.isEmpty(idList)) { - return new ArrayList<>(); - } - return taskInstanceMapper.selectBatchIds(idList); - } - @Override public void deleteByWorkflowInstanceId(int workflowInstanceId) { - taskInstanceMapper.deleteByWorkflowInstanceId(workflowInstanceId); + mybatisMapper.deleteByWorkflowInstanceId(workflowInstanceId); } @Override - public List findTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId) { - return taskInstanceMapper.findByWorkflowInstanceId(workflowInstanceId); + public List queryByWorkflowInstanceId(Integer workflowInstanceId) { + return mybatisMapper.findByWorkflowInstanceId(workflowInstanceId); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandler.java index 7f222bd862..158e66a8b6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandler.java @@ -73,7 +73,7 @@ public class TaskCacheEventHandler implements TaskEventHandler { TaskInstance taskInstance = taskInstanceOptional.get(); dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance); - TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceById(taskEvent.getCacheTaskInstanceId()); + TaskInstance cacheTaskInstance = taskInstanceDao.queryById(taskEvent.getCacheTaskInstanceId()); // keep the task instance fields cacheTaskInstance.setId(taskInstance.getId()); @@ -90,7 +90,7 @@ public class TaskCacheEventHandler implements TaskEventHandler { processService.changeOutParam(taskInstance); - taskInstanceDao.updateTaskInstance(taskInstance); + taskInstanceDao.updateById(taskInstance); TaskStateEvent stateEvent = TaskStateEvent.builder() .processInstanceId(taskEvent.getProcessInstanceId()) .taskInstanceId(taskEvent.getTaskInstanceId()) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java index cc8000092b..4afaadf460 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java @@ -88,7 +88,7 @@ public class TaskDelayEventHandler implements TaskEventHandler { taskInstance.setExecutePath(taskEvent.getExecutePath()); taskInstance.setPid(taskEvent.getProcessId()); taskInstance.setAppLink(taskEvent.getAppIds()); - if (!taskInstanceDao.updateTaskInstance(taskInstance)) { + if (!taskInstanceDao.updateById(taskInstance)) { throw new TaskEventHandleError("Handle task delay event error, update taskInstance to db failed"); } sendAckToWorker(taskEvent); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java index efc6340703..112aec4a1b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java @@ -67,7 +67,7 @@ public class TaskDispatchEventHandler implements TaskEventHandler { taskInstance.setState(TaskExecutionStatus.DISPATCH); taskInstance.setHost(taskEvent.getWorkerAddress()); try { - if (!taskInstanceDao.updateTaskInstance(taskInstance)) { + if (!taskInstanceDao.updateById(taskInstance)) { throw new TaskEventHandleError("Handle task dispatch event error, update taskInstance to db failed"); } } catch (Exception ex) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java index fb8a3fbc74..be38e40ece 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java @@ -98,7 +98,7 @@ public class TaskResultEventHandler implements TaskEventHandler { taskInstance.setEndTime(taskEvent.getEndTime()); taskInstance.setVarPool(taskEvent.getVarPool()); processService.changeOutParam(taskInstance); - taskInstanceDao.updateTaskInstance(taskInstance); + taskInstanceDao.updateById(taskInstance); sendAckToWorker(taskEvent); } catch (Exception ex) { TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java index 3edb6c3006..eb82261b23 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java @@ -84,7 +84,7 @@ public class TaskRunningEventHandler implements TaskEventHandler { taskInstance.setExecutePath(taskEvent.getExecutePath()); taskInstance.setPid(taskEvent.getProcessId()); taskInstance.setAppLink(taskEvent.getAppIds()); - if (!taskInstanceDao.updateTaskInstance(taskInstance)) { + if (!taskInstanceDao.updateById(taskInstance)) { throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed"); } sendAckToWorker(taskEvent); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java index b15da0d64f..0c4800b8fd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java @@ -75,7 +75,7 @@ public class TaskUpdatePidEventHandler implements TaskEventHandler { taskInstance.setStartTime(taskEvent.getStartTime()); taskInstance.setHost(taskEvent.getWorkerAddress()); taskInstance.setPid(taskEvent.getProcessId()); - if (!taskInstanceDao.updateTaskInstance(taskInstance)) { + if (!taskInstanceDao.updateById(taskInstance)) { throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed"); } sendAckToWorker(taskEvent); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java index a8359b6069..b90d57a343 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java @@ -394,7 +394,7 @@ public class StreamTaskExecuteRunnable implements Runnable { taskInstance.setEndTime(taskEvent.getEndTime()); taskInstance.setVarPool(taskEvent.getVarPool()); processService.changeOutParam(taskInstance); - taskInstanceDao.updateTaskInstance(taskInstance); + taskInstanceDao.updateById(taskInstance); // send ack sendAckToWorker(taskEvent); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 06f2bfc549..4258b672dc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -390,7 +390,7 @@ public class WorkflowExecuteRunnable implements Callable { TaskGroupQueue taskGroupQueue = processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId()); if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) { log.info("Begin to force start taskGroupQueue: {}", taskGroupQueue.getId()); - TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId()); + TaskInstance taskInstance = taskInstanceDao.queryById(stateEvent.getTaskInstanceId()); DefaultTaskExecuteRunnable defaultTaskExecuteRunnable = taskExecuteRunnableMap.get(taskInstance.getTaskCode()); @@ -409,7 +409,7 @@ public class WorkflowExecuteRunnable implements Callable { log.info("Begin to wake up taskGroupQueue: {}", taskGroupQueue.getId()); boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue); if (acquireTaskGroup) { - TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId()); + TaskInstance taskInstance = taskInstanceDao.queryById(stateEvent.getTaskInstanceId()); taskExecuteRunnableMap.get(taskInstance.getTaskCode()).dispatch(); log.info("Success wake up taskGroupQueue: {}", taskGroupQueue.getId()); return true; @@ -573,7 +573,7 @@ public class WorkflowExecuteRunnable implements Callable { */ public void refreshTaskInstance(int taskInstanceId) { log.info("task instance update: {} ", taskInstanceId); - TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId); + TaskInstance taskInstance = taskInstanceDao.queryById(taskInstanceId); if (taskInstance == null) { log.error("can not find task instance, id:{}", taskInstanceId); return; @@ -837,7 +837,7 @@ public class WorkflowExecuteRunnable implements Callable { List processTaskRelations = processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); List taskDefinitionLogs = - taskDefinitionLogDao.getTaskDefineLogListByRelation(processTaskRelations); + taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations); List taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); forbiddenTaskMap.clear(); @@ -880,7 +880,7 @@ public class WorkflowExecuteRunnable implements Callable { processInstance.getRunTimes(), processInstance.getRecovery()); List validTaskInstanceList = - taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), + taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), processInstance.getTestFlag()); for (TaskInstance task : validTaskInstanceList) { try ( @@ -898,7 +898,7 @@ public class WorkflowExecuteRunnable implements Callable { TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); if (!oldTaskInstance.getState().isFinished() && task.getState().isFinished()) { task.setFlag(Flag.NO); - taskInstanceDao.updateTaskInstance(task); + taskInstanceDao.updateById(task); continue; } } @@ -919,7 +919,7 @@ public class WorkflowExecuteRunnable implements Callable { if (task.getState().isNeedFaultTolerance()) { log.info("TaskInstance needs fault tolerance, will be added to standby list."); task.setFlag(Flag.NO); - taskInstanceDao.updateTaskInstance(task); + taskInstanceDao.updateById(task); // tolerantTaskInstance add to standby list directly TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task); @@ -974,7 +974,7 @@ public class WorkflowExecuteRunnable implements Callable { processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)); processInstance.setGlobalParams(globalParams); - processInstanceDao.updateProcessInstance(processInstance); + processInstanceDao.updateById(processInstance); } } } @@ -1008,7 +1008,7 @@ public class WorkflowExecuteRunnable implements Callable { if (taskInstance.getId() != oldTaskInstanceId) { TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); oldTaskInstance.setFlag(Flag.NO); - taskInstanceDao.updateTaskInstance(oldTaskInstance); + taskInstanceDao.updateById(oldTaskInstance); validTaskMap.remove(taskInstance.getTaskCode()); taskExecuteRunnableMap.remove(taskInstance.getTaskCode()); } @@ -1350,7 +1350,7 @@ public class WorkflowExecuteRunnable implements Callable { existTaskInstance.setFlag(Flag.NO); existTaskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE); validTaskMap.remove(existTaskInstance.getTaskCode()); - taskInstanceDao.updateTaskInstance(existTaskInstance); + taskInstanceDao.updateById(existTaskInstance); existTaskInstance = cloneTolerantTaskInstance(existTaskInstance); log.info("task {} cannot be take over will generate a tolerant task instance", existTaskInstance.getName()); @@ -1784,7 +1784,7 @@ public class WorkflowExecuteRunnable implements Callable { processInstance.setEndTime(new Date()); } try { - processInstanceDao.updateProcessInstance(processInstance); + processInstanceDao.updateById(processInstance); } catch (Exception ex) { // recover the status processInstance.setStateWithDesc(originStates, "recover state by DB error"); @@ -1866,7 +1866,7 @@ public class WorkflowExecuteRunnable implements Callable { try ( final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstanceId)) { - TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId); + TaskInstance taskInstance = taskInstanceDao.queryById(taskInstanceId); if (taskInstance == null || taskInstance.getState().isFinished()) { continue; } @@ -1897,7 +1897,7 @@ public class WorkflowExecuteRunnable implements Callable { while ((task = readyToSubmitTaskQueue.peek()) != null) { // stop tasks which is retrying if forced success happens if (task.taskCanRetry()) { - TaskInstance retryTask = taskInstanceDao.findTaskInstanceById(task.getId()); + TaskInstance retryTask = taskInstanceDao.queryById(task.getId()); if (retryTask != null && retryTask.getState().isForceSuccess()) { task.setState(retryTask.getState()); log.info( @@ -1975,7 +1975,7 @@ public class WorkflowExecuteRunnable implements Callable { .map(Integer::valueOf) .collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(startTaskInstanceIds)) { - return taskInstanceDao.findTaskInstanceByIdList(startTaskInstanceIds); + return taskInstanceDao.queryByIds(startTaskInstanceIds); } } return Collections.emptyList(); @@ -2137,7 +2137,7 @@ public class WorkflowExecuteRunnable implements Callable { if (validTaskMap.containsKey(taskCode)) { taskInstance = taskInstanceMap.get(validTaskMap.get(taskCode)); } else { - taskInstance = taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskCode); + taskInstance = taskInstanceDao.queryByWorkflowInstanceIdAndTaskCode(processInstance.getId(), taskCode); } if (taskInstance == null) { continue; @@ -2147,7 +2147,7 @@ public class WorkflowExecuteRunnable implements Callable { for (TaskInstance taskInstance : removeTaskInstances) { taskInstance.setFlag(Flag.NO); - taskInstanceDao.updateTaskInstance(taskInstance); + taskInstanceDao.updateById(taskInstance); } Set removeSet = new HashSet<>(); @@ -2173,7 +2173,7 @@ public class WorkflowExecuteRunnable implements Callable { .collect(Collectors.toList()); processInstance.setVarPool(JSONUtils.toJsonString(processProperties)); - processInstanceDao.updateProcessInstance(processInstance); + processInstanceDao.updateById(processInstance); // remove task instance from taskInstanceMap, completeTaskMap, validTaskMap, errorTaskMap taskInstanceMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getValue().getTaskCode()))); @@ -2188,7 +2188,7 @@ public class WorkflowExecuteRunnable implements Callable { if (taskId.equals(taskInstance.getId())) { taskInstance.setCacheKey(taskIdAndCacheKey.getRight()); try { - taskInstanceDao.updateTaskInstance(taskInstance); + taskInstanceDao.updateById(taskInstance); } catch (Exception e) { log.error("update task instance cache key failed", e); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskKillOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskKillOperator.java index 7b95ad767e..c2bf2867c5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskKillOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskKillOperator.java @@ -65,7 +65,7 @@ public class TaskKillOperator implements TaskOperator { private void killTaskInstanceInDB(TaskInstance taskInstance) { taskInstance.setState(TaskExecutionStatus.KILL); taskInstance.setEndTime(new Date()); - taskInstanceDao.updateTaskInstance(taskInstance); + taskInstanceDao.updateById(taskInstance); } private void killRemoteTaskInstanceInThreadPool(TaskInstance taskInstance) throws RemotingException { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java index 146c3b894d..0da88a0dd3 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java @@ -73,7 +73,7 @@ public class TaskTimeoutOperator implements TaskOperator { private void timeoutTaskInstanceInDB(TaskInstance taskInstance) { taskInstance.setState(TaskExecutionStatus.FAILURE); taskInstance.setEndTime(new Date()); - taskInstanceDao.updateTaskInstance(taskInstance); + taskInstanceDao.updateById(taskInstance); } private void killRemoteTaskInstanceInThreadPool(TaskInstance taskInstance) throws RemotingException { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/blocking/BlockingLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/blocking/BlockingLogicTask.java index c77dc671fd..845a78101c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/blocking/BlockingLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/blocking/BlockingLogicTask.java @@ -91,7 +91,7 @@ public class BlockingLogicTask extends BaseSyncLogicTask { private DependResult calculateConditionResult() throws MasterTaskExecuteException { // todo: Directly get the task instance from the cache Map completeTaskList = taskInstanceDao - .findValidTaskListByProcessId(taskExecutionContext.getProcessInstanceId(), + .queryValidTaskListByWorkflowInstanceId(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTestFlag()) .stream() .collect(Collectors.toMap(TaskInstance::getTaskCode, Function.identity())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java index b0d989ee2d..803a8043ff 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java @@ -75,9 +75,10 @@ public class ConditionLogicTask extends BaseSyncLogicTask { private DependResult calculateConditionResult() { final ProcessInstance processInstance = - workflowInstanceDao.queryByWorkflowInstanceId(taskExecutionContext.getProcessInstanceId()); + workflowInstanceDao.queryById(taskExecutionContext.getProcessInstanceId()); final List taskInstances = - taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()); + taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), + processInstance.getTestFlag()); final Map taskInstanceMap = taskInstances.stream().collect(Collectors.toMap(TaskInstance::getTaskCode, Function.identity())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java index 858057dbb8..c76f13ea4d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java @@ -84,7 +84,7 @@ public class DependentAsyncTaskExecuteFunction implements AsyncTaskExecuteFuncti this.taskDefinitionDao = taskDefinitionDao; this.taskInstanceDao = taskInstanceDao; this.processInstance = - processInstanceDao.queryByWorkflowInstanceId(taskExecutionContext.getProcessInstanceId()); + processInstanceDao.queryById(taskExecutionContext.getProcessInstanceId()); this.dependentDate = calculateDependentDate(); this.dependentTaskList = initializeDependentTaskList(); log.info("Initialized dependent task list successfully"); @@ -132,7 +132,7 @@ public class DependentAsyncTaskExecuteFunction implements AsyncTaskExecuteFuncti final Map taskDefinitionMap = taskDefinitionDao.queryByCodes(taskDefinitionCodes).stream() .collect(Collectors.toMap(TaskDefinition::getCode, Function.identity())); final TaskInstance taskInstance = - taskInstanceDao.findTaskInstanceById(taskExecutionContext.getTaskInstanceId()); + taskInstanceDao.queryById(taskExecutionContext.getTaskInstanceId()); List dependentExecutes = dependentParameters.getDependTaskList() .stream() .map(dependentTaskModel -> { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java index d3494042cd..377226dcb8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java @@ -101,9 +101,8 @@ public class DynamicLogicTask extends BaseAsyncLogicTask { this.processDefineMapper = processDefineMapper; this.commandMapper = commandMapper; - this.processInstance = - processInstanceDao.queryByWorkflowInstanceId(taskExecutionContext.getProcessInstanceId()); - this.taskInstance = taskInstanceDao.findTaskInstanceById(taskExecutionContext.getTaskInstanceId()); + this.processInstance = processInstanceDao.queryById(taskExecutionContext.getProcessInstanceId()); + this.taskInstance = taskInstanceDao.queryById(taskExecutionContext.getTaskInstanceId()); } @Override @@ -134,7 +133,7 @@ public class DynamicLogicTask extends BaseAsyncLogicTask { case REPEAT_RUNNING: existsSubProcessInstanceList.forEach(processInstance -> { processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN); - processInstanceDao.updateProcessInstance(processInstance); + processInstanceDao.updateById(processInstance); }); break; case START_FAILURE_TASK_PROCESS: @@ -143,7 +142,7 @@ public class DynamicLogicTask extends BaseAsyncLogicTask { subWorkflowService.filterFailedProcessInstances(existsSubProcessInstanceList); failedProcessInstances.forEach(processInstance -> { processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN); - processInstanceDao.updateProcessInstance(processInstance); + processInstanceDao.updateById(processInstance); }); break; } @@ -165,7 +164,7 @@ public class DynamicLogicTask extends BaseAsyncLogicTask { dynamicStartParams); ProcessInstance subProcessInstance = createSubProcessInstance(command); subProcessInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN); - processInstanceDao.insertProcessInstance(subProcessInstance); + processInstanceDao.insert(subProcessInstance); command.setProcessInstanceId(subProcessInstance.getId()); processInstanceList.add(subProcessInstance); } @@ -273,7 +272,7 @@ public class DynamicLogicTask extends BaseAsyncLogicTask { subWorkflowService.filterRunningProcessInstances(existsSubProcessInstanceList); for (ProcessInstance subProcessInstance : runningSubProcessInstanceList) { subProcessInstance.setState(stopStatus); - processInstanceDao.updateProcessInstance(subProcessInstance); + processInstanceDao.updateById(subProcessInstance); if (subProcessInstance.getState().isFinished()) { log.info("The process instance [{}] is finished, no need to stop", subProcessInstance.getId()); return; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowAsyncTaskExecuteFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowAsyncTaskExecuteFunction.java index 08a4f6836e..3cea3237d5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowAsyncTaskExecuteFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowAsyncTaskExecuteFunction.java @@ -46,14 +46,14 @@ public class SubWorkflowAsyncTaskExecuteFunction implements AsyncTaskExecuteFunc public @NonNull AsyncTaskExecutionStatus getAsyncTaskExecutionStatus() { // query the status of sub workflow instance if (subWorkflowInstance == null) { - subWorkflowInstance = processInstanceDao.findSubProcessInstanceByParentId( + subWorkflowInstance = processInstanceDao.querySubProcessInstanceByParentId( taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); } if (subWorkflowInstance == null) { log.info("The sub workflow instance doesn't created"); return AsyncTaskExecutionStatus.RUNNING; } - subWorkflowInstance = processInstanceDao.queryByWorkflowInstanceId(subWorkflowInstance.getId()); + subWorkflowInstance = processInstanceDao.queryById(subWorkflowInstance.getId()); if (subWorkflowInstance != null && subWorkflowInstance.getState().isFinished()) { return subWorkflowInstance.getState().isSuccess() ? AsyncTaskExecutionStatus.SUCCESS : AsyncTaskExecutionStatus.FAILED; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java index 7b3008b581..e0e5b70500 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java @@ -73,7 +73,7 @@ public class SubWorkflowLogicTask extends BaseAsyncLogicTask taskInstanceList = - taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), testFlag); + taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), testFlag); for (TaskInstance task : taskInstanceList) { if (task.getTaskCode() == taskCode) { @@ -192,10 +192,10 @@ public class DependentExecute { private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) { ProcessInstance lastSchedulerProcess = - processInstanceDao.findLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag); + processInstanceDao.queryLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag); ProcessInstance lastManualProcess = - processInstanceDao.findLastManualProcessInterval(definitionCode, dateInterval, testFlag); + processInstanceDao.queryLastManualProcessInterval(definitionCode, dateInterval, testFlag); if (lastManualProcess == null) { return lastSchedulerProcess; diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandlerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandlerTest.java index 958cd3a6da..4e1af4dc27 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandlerTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandlerTest.java @@ -80,7 +80,7 @@ class TaskCacheEventHandlerTest { cacheTaskInstance.setProcessInstanceId(cacheProcessInstanceId); cacheTaskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>())); - Mockito.when(taskInstanceDao.findTaskInstanceById(cacheTaskInstanceId)).thenReturn(cacheTaskInstance); + Mockito.when(taskInstanceDao.queryById(cacheTaskInstanceId)).thenReturn(cacheTaskInstance); WorkflowExecuteRunnable workflowExecuteRunnable = Mockito.mock(WorkflowExecuteRunnable.class); Mockito.when(processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId)) diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index cdfa2d2755..f224ff36c6 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -161,7 +161,7 @@ public class WorkflowExecuteRunnableTest { taskInstance4.setId(4); Map cmdParam = new HashMap<>(); cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4"); - Mockito.when(taskInstanceDao.findTaskInstanceByIdList( + Mockito.when(taskInstanceDao.queryByIds( Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))) .thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4)); @@ -308,9 +308,11 @@ public class WorkflowExecuteRunnableTest { dagField.setAccessible(true); dagField.set(workflowExecuteThread, dag); - Mockito.when(taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskInstance1.getTaskCode())) + Mockito.when(taskInstanceDao.queryByWorkflowInstanceIdAndTaskCode(processInstance.getId(), + taskInstance1.getTaskCode())) .thenReturn(taskInstance1); - Mockito.when(taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskInstance2.getTaskCode())) + Mockito.when(taskInstanceDao.queryByWorkflowInstanceIdAndTaskCode(processInstance.getId(), + taskInstance2.getTaskCode())) .thenReturn(null); workflowExecuteThread.clearDataIfExecuteTask(); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskTest.java index d2e15d87e2..15fccaf9a1 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskTest.java @@ -87,7 +87,7 @@ class DynamicLogicTaskTest { taskExecutionContext = Mockito.mock(TaskExecutionContext.class); objectMapper = new ObjectMapper(); processInstance = new ProcessInstance(); - Mockito.when(processInstanceDao.queryByWorkflowInstanceId(Mockito.any())).thenReturn(processInstance); + Mockito.when(processInstanceDao.queryById(Mockito.any())).thenReturn(processInstance); dynamicLogicTask = new DynamicLogicTask( taskExecutionContext, processInstanceDao, @@ -163,7 +163,7 @@ class DynamicLogicTaskTest { dynamicLogicTask.resetProcessInstanceStatus(subProcessInstances); - Mockito.verify(processInstanceDao).updateProcessInstance(subProcessInstance); + Mockito.verify(processInstanceDao).updateById(subProcessInstance); Assertions.assertEquals(WorkflowExecutionStatus.WAIT_TO_RUN, subProcessInstance.getState()); } @@ -178,7 +178,7 @@ class DynamicLogicTaskTest { dynamicLogicTask.resetProcessInstanceStatus(subProcessInstances); - Mockito.verify(processInstanceDao).updateProcessInstance(failedSubProcessInstance); + Mockito.verify(processInstanceDao).updateById(failedSubProcessInstance); Assertions.assertEquals(WorkflowExecutionStatus.WAIT_TO_RUN, failedSubProcessInstance.getState()); } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java index 2e79b44426..142932b0cd 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java @@ -154,7 +154,7 @@ public class FailoverServiceTest { given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())) .willReturn(Arrays.asList(processInstance)); doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class)); - given(taskInstanceDao.findValidTaskListByProcessId(Mockito.anyInt(), Mockito.anyInt())) + given(taskInstanceDao.queryValidTaskListByWorkflowInstanceId(Mockito.anyInt(), Mockito.anyInt())) .willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance)); Thread.sleep(1000); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 6e99457389..519ae78299 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -386,9 +386,9 @@ public class ProcessServiceImpl implements ProcessService { info.setCommandType(CommandType.STOP); info.addHistoryCmd(CommandType.STOP); info.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, "ready stop by serial_priority strategy"); - int update = processInstanceDao.updateProcessInstance(info); + boolean update = processInstanceDao.updateById(info); // determine whether the process is normal - if (update > 0) { + if (update) { WorkflowStateEventChangeRequest workflowStateEventChangeRequest = new WorkflowStateEventChangeRequest( info.getId(), 0, info.getState(), info.getId(), 0); @@ -506,7 +506,7 @@ public class ProcessServiceImpl implements ProcessService { */ @Override public void removeTaskLogFile(Integer processInstanceId) { - List taskInstanceList = taskInstanceDao.findTaskInstanceByWorkflowInstanceId(processInstanceId); + List taskInstanceList = taskInstanceDao.queryByWorkflowInstanceId(processInstanceId); if (CollectionUtils.isEmpty(taskInstanceList)) { return; } @@ -832,7 +832,7 @@ public class ProcessServiceImpl implements ProcessService { failedList.addAll(killedList); failedList.addAll(toleranceList); for (Integer taskId : failedList) { - initTaskInstance(taskInstanceDao.findTaskInstanceById(taskId)); + initTaskInstance(taskInstanceDao.queryById(taskId)); } cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING, String.join(Constants.COMMA, convertIntListToString(failedList))); @@ -850,7 +850,7 @@ public class ProcessServiceImpl implements ProcessService { TaskExecutionStatus.KILL); for (Integer taskId : stopNodeList) { // initialize the pause state - initTaskInstance(taskInstanceDao.findTaskInstanceById(taskId)); + initTaskInstance(taskInstanceDao.queryById(taskId)); } cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING, String.join(Constants.COMMA, convertIntListToString(stopNodeList))); @@ -867,11 +867,11 @@ public class ProcessServiceImpl implements ProcessService { // delete all the valid tasks when complement data if id is not null if (processInstance.getId() != null) { List taskInstanceList = - taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), + taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), processInstance.getTestFlag()); for (TaskInstance taskInstance : taskInstanceList) { taskInstance.setFlag(Flag.NO); - taskInstanceDao.updateTaskInstance(taskInstance); + taskInstanceDao.updateById(taskInstance); } } break; @@ -889,11 +889,11 @@ public class ProcessServiceImpl implements ProcessService { } // delete all the valid tasks when repeat running List validTaskList = - taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), + taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), processInstance.getTestFlag()); for (TaskInstance taskInstance : validTaskList) { taskInstance.setFlag(Flag.NO); - taskInstanceDao.updateTaskInstance(taskInstance); + taskInstanceDao.updateById(taskInstance); } processInstance.setStartTime(new Date()); processInstance.setRestartTime(processInstance.getStartTime()); @@ -1050,7 +1050,7 @@ public class ProcessServiceImpl implements ProcessService { // update sub process id to process map table processInstanceMap.setProcessInstanceId(subProcessInstance.getId()); - processInstanceMapDao.updateWorkProcessInstanceMap(processInstanceMap); + processInstanceMapDao.updateById(processInstanceMap); } /** @@ -1112,11 +1112,11 @@ public class ProcessServiceImpl implements ProcessService { if (!taskInstance.isSubProcess() && (taskInstance.getState().isKill() || taskInstance.getState().isFailure())) { taskInstance.setFlag(Flag.NO); - taskInstanceDao.updateTaskInstance(taskInstance); + taskInstanceDao.updateById(taskInstance); return; } taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS); - taskInstanceDao.updateTaskInstance(taskInstance); + taskInstanceDao.updateById(taskInstance); } /** @@ -1207,7 +1207,7 @@ public class ProcessServiceImpl implements ProcessService { processMap = findPreviousTaskProcessMap(parentInstance, parentTask); if (processMap != null) { processMap.setParentTaskInstanceId(parentTask.getId()); - processInstanceMapDao.updateWorkProcessInstanceMap(processMap); + processInstanceMapDao.updateById(processMap); return processMap; } } @@ -1215,7 +1215,7 @@ public class ProcessServiceImpl implements ProcessService { processMap = new ProcessInstanceMap(); processMap.setParentProcessInstanceId(parentInstance.getId()); processMap.setParentTaskInstanceId(parentTask.getId()); - processInstanceMapDao.createWorkProcessInstanceMap(processMap); + processInstanceMapDao.insert(processMap); return processMap; } @@ -1231,12 +1231,12 @@ public class ProcessServiceImpl implements ProcessService { Integer preTaskId = 0; List preTaskList = - taskInstanceDao.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId()); + taskInstanceDao.queryPreviousTaskListByWorkflowInstanceId(parentProcessInstance.getId()); for (TaskInstance task : preTaskList) { if (task.getName().equals(parentTask.getName())) { preTaskId = task.getId(); ProcessInstanceMap map = - processInstanceMapDao.findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId); + processInstanceMapDao.queryWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId); if (map != null) { return map; } @@ -1260,7 +1260,7 @@ public class ProcessServiceImpl implements ProcessService { } // check create sub work flow firstly ProcessInstanceMap instanceMap = - processInstanceMapDao.findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId()); + processInstanceMapDao.queryWorkProcessMapByParent(parentProcessInstance.getId(), task.getId()); if (null != instanceMap && CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) { // recover failover tolerance would not create a new command when the sub command already have been created @@ -1295,7 +1295,7 @@ public class ProcessServiceImpl implements ProcessService { private void initSubInstanceState(ProcessInstance childInstance) { if (childInstance != null) { childInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "init sub workflow instance"); - processInstanceDao.updateProcessInstance(childInstance); + processInstanceDao.updateById(childInstance); } } @@ -2140,10 +2140,11 @@ public class ProcessServiceImpl implements ProcessService { @Override public DagData genDagData(ProcessDefinition processDefinition) { List taskRelations = - this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); - List taskDefinitionLogList = taskDefinitionLogDao.getTaskDefineLogList(taskRelations); - List taskDefinitions = - taskDefinitionLogList.stream().map(t -> (TaskDefinition) t).collect(Collectors.toList()); + findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); + List taskDefinitions = taskDefinitionLogDao.queryTaskDefineLogList(taskRelations) + .stream() + .map(t -> (TaskDefinition) t) + .collect(Collectors.toList()); return new DagData(processDefinition, taskRelations, taskDefinitions); } @@ -2190,7 +2191,7 @@ public class ProcessServiceImpl implements ProcessService { }); } if (CollectionUtils.isEmpty(taskDefinitionLogs)) { - taskDefinitionLogs = taskDefinitionLogDao.getTaskDefineLogList(taskRelationList); + taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(taskRelationList); } Map taskDefinitionLogMap = taskDefinitionLogs.stream() .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); @@ -2246,7 +2247,7 @@ public class ProcessServiceImpl implements ProcessService { return processTaskMap; } ProcessInstance fatherProcess = this.findProcessInstanceById(processInstanceMap.getParentProcessInstanceId()); - TaskInstance fatherTask = taskInstanceDao.findTaskInstanceById(processInstanceMap.getParentTaskInstanceId()); + TaskInstance fatherTask = taskInstanceDao.queryById(processInstanceMap.getParentTaskInstanceId()); if (fatherProcess != null) { processTaskMap.put(fatherProcess, fatherTask); @@ -2581,13 +2582,13 @@ public class ProcessServiceImpl implements ProcessService { if (processInstance != null && (processInstance.getState().isFailure() || processInstance.getState().isStop())) { List validTaskList = - taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), + taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), processInstance.getTestFlag()); List instanceTaskCodeList = validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList()); List taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); - List taskDefinitionLogs = taskDefinitionLogDao.getTaskDefineLogList(taskRelations); + List taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(taskRelations); List definiteTaskCodeList = taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES) .map(TaskDefinitionLog::getCode).collect(Collectors.toList()); @@ -2599,7 +2600,7 @@ public class ProcessServiceImpl implements ProcessService { .map(TaskInstance::getId).collect(Collectors.toList()); if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) { processInstance.setStateWithDesc(WorkflowExecutionStatus.SUCCESS, "success by task force success"); - processInstanceDao.updateProcessInstance(processInstance); + processInstanceDao.updateById(processInstance); } } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java index 24fb8c35db..b45f4cc5a5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java @@ -55,7 +55,7 @@ public class SubWorkflowServiceImpl implements SubWorkflowService { List allSubProcessInstanceId = relationSubWorkflows.stream() .map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(Collectors.toList()); - List allSubProcessInstance = processInstanceDao.queryBatchIds(allSubProcessInstanceId); + List allSubProcessInstance = processInstanceDao.queryByIds(allSubProcessInstanceId); allSubProcessInstance.sort(Comparator.comparing(ProcessInstance::getId)); return allSubProcessInstance; } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 1e94407a4b..93c3a95b29 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -700,7 +700,7 @@ public class ProcessServiceTest { taskDefinitionLogs.add(taskDefinition); taskDefinitionLogs.add(td2); - Mockito.when(taskDefinitionLogDao.getTaskDefineLogList(any())).thenReturn(taskDefinitionLogs); + Mockito.when(taskDefinitionLogDao.queryTaskDefineLogList(any())).thenReturn(taskDefinitionLogs); Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt())) .thenReturn(list);