diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java index eecb40cc63..239c9669f1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java @@ -33,9 +33,9 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.log.LogClient; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.lang3.StringUtils; @@ -60,7 +60,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService private static final String LOG_HEAD_FORMAT = "[LOG-PATH]: %s, [HOST]: %s%s"; @Autowired - private ProcessService processService; + private TaskInstanceDao taskInstanceDao; @Autowired private LogClient logClient; @@ -86,7 +86,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService @SuppressWarnings("unchecked") public Result queryLog(int taskInstId, int skipLineNum, int limit) { - TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId); + TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstId); if (taskInstance == null) { logger.error("Task instance does not exist, taskInstanceId:{}.", taskInstId); @@ -111,7 +111,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService */ @Override public byte[] getLogBytes(int taskInstId) { - TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId); + TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstId); if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { throw new ServiceException("task instance is null or host is null"); } @@ -138,7 +138,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService return result; } // check whether the task instance can be found - TaskInstance task = processService.findTaskInstanceById(taskInstId); + TaskInstance task = taskInstanceDao.findTaskInstanceById(taskInstId); if (task == null || StringUtils.isBlank(task.getHost())) { putMsg(result, Status.TASK_INSTANCE_NOT_FOUND); return result; @@ -171,7 +171,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService throw new ServiceException("user has no permission"); } // check whether the task instance can be found - TaskInstance task = processService.findTaskInstanceById(taskInstId); + TaskInstance task = taskInstanceDao.findTaskInstanceById(taskInstId); if (task == null || StringUtils.isBlank(task.getHost())) { throw new ServiceException("task instance is null or host is null"); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index ca6ccf7d65..acb05b897e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -103,6 +103,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.model.PageListingResult; import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; @@ -200,6 +201,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro @Autowired private ProcessService processService; + @Autowired + private TaskDefinitionLogDao taskDefinitionLogDao; + @Autowired private ProcessTaskRelationMapper processTaskRelationMapper; @@ -388,9 +392,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro if (insertVersion == 0) { logger.error("Save process definition error, processCode:{}.", processDefinition.getCode()); throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR); - } else + } else { logger.info("Save process definition complete, processCode:{}, processVersion:{}.", processDefinition.getCode(), insertVersion); + } int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE); @@ -398,9 +403,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.", processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); - } else + } else { logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.", processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); + } saveOtherRelation(loginUser, processDefinition, result, otherParamsJson); @@ -885,9 +891,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro logger.error("Update process definition error, processCode:{}.", processDefinition.getCode()); putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); - } else + } else { logger.info("Update process definition complete, processCode:{}, processVersion:{}.", processDefinition.getCode(), insertVersion); + } taskUsedInOtherTaskValid(processDefinition, taskRelationList); int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), @@ -1136,9 +1143,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro projectCode, code, schedule.getId()); putMsg(result, Status.OFFLINE_SCHEDULE_ERROR); throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR); - } else + } else { logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, schedule.getId()); + } schedulerService.deleteSchedule(project.getId(), schedule.getId()); } } @@ -1184,8 +1192,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro if (CollectionUtils.isNotEmpty(dagDataSchedules)) { logger.info("Start download process definition file, processDefinitionCodes:{}.", defineCodeSet); downloadProcessDefinitionFile(response, dagDataSchedules); - } else + } else { logger.error("There is no exported process dag data."); + } } /** @@ -1856,7 +1865,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processDefinitions.get(0).getVersion()); // query task definition log - List taskDefinitionLogsList = processService.genTaskDefineList(processTaskRelations); + List taskDefinitionLogsList = + taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations); List taskDefinitionList = new ArrayList<>(); for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogsList) { @@ -1906,7 +1916,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro List processInstanceList = processInstanceService.queryByProcessDefineCode(code, limit); processInstanceList.forEach(processInstance -> processInstance .setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()))); - List taskDefinitionList = processService.genTaskDefineList(processTaskRelationMapper + List taskDefinitionList = taskDefinitionLogDao.getTaskDefineLogList(processTaskRelationMapper .queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode())); Map taskDefinitionMap = taskDefinitionList.stream() .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); @@ -2144,7 +2154,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro String otherParamsJson = doOtherOperateProcess(loginUser, processDefinition); if (isCopy) { logger.info("Copy process definition..."); - List taskDefinitionLogs = processService.genTaskDefineList(processTaskRelations); + List taskDefinitionLogs = + taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations); Map taskCodeMap = new HashMap<>(); for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { try { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index ce1747ca85..3d350151aa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -71,6 +71,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.WorkflowUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.model.Property; @@ -129,6 +130,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce @Autowired ProcessService processService; + @Autowired + TaskInstanceDao taskInstanceDao; + @Autowired ProcessInstanceMapper processInstanceMapper; @@ -366,7 +370,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } List taskInstanceList = - processService.findValidTaskListByProcessId(processId, processInstance.getTestFlag()); + taskInstanceDao.findValidTaskListByProcessId(processId, processInstance.getTestFlag()); addDependResultForTaskList(taskInstanceList); Map resultMap = new HashMap<>(); resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString()); @@ -444,7 +448,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } - TaskInstance taskInstance = processService.findTaskInstanceById(taskId); + TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskId); if (taskInstance == null) { logger.error("Task instance does not exist, projectCode:{}, taskInstanceId{}.", projectCode, taskId); putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java index 747903da38..ca34d74730 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java @@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.service.log.LogClient; -import org.apache.dolphinscheduler.service.process.ProcessService; import java.text.MessageFormat; import java.util.HashMap; @@ -63,7 +63,7 @@ public class LoggerServiceTest { private LoggerServiceImpl loggerService; @Mock - private ProcessService processService; + private TaskInstanceDao taskInstanceDao; @Mock private ProjectMapper projectMapper; @@ -81,7 +81,7 @@ public class LoggerServiceTest { public void testQueryDataSourceList() { TaskInstance taskInstance = new TaskInstance(); - Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); Result result = loggerService.queryLog(2, 1, 1); // TASK_INSTANCE_NOT_FOUND Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue()); @@ -98,7 +98,7 @@ public class LoggerServiceTest { // SUCCESS taskInstance.setHost("127.0.0.1:8080"); taskInstance.setLogPath("/temp/log"); - Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); result = loggerService.queryLog(1, 1, 1); Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); } @@ -107,7 +107,7 @@ public class LoggerServiceTest { public void testGetLogBytes() { TaskInstance taskInstance = new TaskInstance(); - Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); // task instance is null try { @@ -146,7 +146,7 @@ public class LoggerServiceTest { Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, projectCode); TaskInstance taskInstance = new TaskInstance(); - Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); TaskDefinition taskDefinition = new TaskDefinition(); taskDefinition.setProjectCode(projectCode); taskDefinition.setCode(1L); @@ -156,7 +156,7 @@ public class LoggerServiceTest { taskInstance.setHost("127.0.0.1:8080"); taskInstance.setLogPath("/temp/log"); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, VIEW_LOG)).thenReturn(result); - Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1); Assertions.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); @@ -184,7 +184,7 @@ public class LoggerServiceTest { taskInstance.setLogPath("/temp/log"); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, DOWNLOAD_LOG)) .thenReturn(result); - Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString())) .thenReturn(new byte[0]); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 05c9a0b044..31502b917e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -28,6 +28,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE; import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT; import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest; @@ -67,6 +68,7 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.model.PageListingResult; import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -149,6 +151,9 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { @Mock private ProcessService processService; + @Mock + private TaskDefinitionLogDao taskDefinitionLogDao; + @Mock private ProcessInstanceService processInstanceService; @@ -756,8 +761,10 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { result.put(Constants.STATUS, Status.SUCCESS); Mockito.when(projectMapper.queryByCode(1)).thenReturn(project1); Mockito.when(projectService.checkProjectAndAuth(user, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result); - Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); + Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode)) + .thenReturn(getProcessTaskRelation()); + Mockito.when(taskDefinitionLogDao.getTaskDefineLogList(any())).thenReturn(new ArrayList<>()); Map taskNotNuLLRes = processDefinitionService.viewTree(user, processDefinition.getProjectCode(), 46, 10); Assertions.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index feefd1f6f2..f6bbefd8e4 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -57,6 +57,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.service.expand.CuringParamsService; @@ -104,6 +105,9 @@ public class ProcessInstanceServiceTest { @Mock ProcessService processService; + @Mock + TaskInstanceDao taskInstanceDao; + @Mock ProcessInstanceDao processInstanceDao; @@ -408,7 +412,7 @@ public class ProcessInstanceServiceTest { when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); when(processService.findProcessInstanceDetailById(processInstance.getId())) .thenReturn(Optional.of(processInstance)); - when(processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag())) + when(taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag())) .thenReturn(taskInstanceList); when(loggerService.queryLog(taskInstance.getId(), 0, 4098)).thenReturn(res); Map successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1); @@ -451,7 +455,7 @@ public class ProcessInstanceServiceTest { putMsg(result, Status.SUCCESS, projectCode); when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); - when(processService.findTaskInstanceById(1)).thenReturn(null); + when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(null); Map taskNullRes = processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1); Assertions.assertEquals(Status.TASK_INSTANCE_NOT_EXISTS, taskNullRes.get(Constants.STATUS)); @@ -461,7 +465,7 @@ public class ProcessInstanceServiceTest { taskInstance.setTaskType("HTTP"); taskInstance.setProcessInstanceId(1); putMsg(result, Status.SUCCESS, projectCode); - when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance); TaskDefinition taskDefinition = new TaskDefinition(); taskDefinition.setProjectCode(projectCode); when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); @@ -481,7 +485,7 @@ public class ProcessInstanceServiceTest { subTask.setTaskType("SUB_PROCESS"); subTask.setProcessInstanceId(1); putMsg(result, Status.SUCCESS, projectCode); - when(processService.findTaskInstanceById(subTask.getId())).thenReturn(subTask); + when(taskInstanceDao.findTaskInstanceById(subTask.getId())).thenReturn(subTask); when(processService.findSubProcessInstance(subTask.getProcessInstanceId(), subTask.getId())).thenReturn(null); Map subprocessNotExistRes = processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java new file mode 100644 index 0000000000..5465c6d40d --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; + +/** + * Process Instance Map DAO + */ +public interface ProcessInstanceMapDao { + + /** + * Update process instance map + * @param processInstanceMap process instance map + * @return result + */ + int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap); + + /** + * Create process instance map to DB. + * @param processInstanceMap process instance map + * @return result + */ + int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap); + + /** + * find work process map by parent process id and parent task id. + * @param parentWorkProcessId parentWorkProcessId + * @param parentTaskId parentTaskId + * @return process instance map + */ + ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId); + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java new file mode 100644 index 0000000000..9aaed9e576 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; + +import java.util.List; + +/** + * Task Instance DAO + */ +public interface TaskDefinitionDao { + + /** + * Get list of task definition by process definition code + * @param processDefinitionCode process definition code + * @return list of task definition + */ + List getTaskDefinitionListByDefinition(long processDefinitionCode); + + /** + * Query task definition by code and version + * @param taskCode task code + * @param taskDefinitionVersion task definition version + * @return task definition + */ + TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion); + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java new file mode 100644 index 0000000000..8b6f290f49 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; + +import java.util.List; + +/** + * Task Definition Log DAO + */ +public interface TaskDefinitionLogDao { + + /** + * Get task definition log list + * @param processTaskRelations list of process task relation + * @return list of task definition + */ + List getTaskDefineLogList(List processTaskRelations); + + /** + * Query task definition log list by process task relation list + * @param processTaskRelations list of task relation + * @return list of task definition log + */ + List getTaskDefineLogListByRelation(List processTaskRelations); + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java new file mode 100644 index 0000000000..2b2adc9a3f --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +import java.util.List; + +/** + * Task Instance DAO + */ +public interface TaskInstanceDao { + + /** + * Update or Insert task instance to DB. + * ID is null -> Insert + * ID is not null -> Update + * @param taskInstance task instance + * @return result + */ + boolean upsertTaskInstance(TaskInstance taskInstance); + + /** + * Insert task instance to DB. + * @param taskInstance task instance + * @return result + */ + boolean insertTaskInstance(TaskInstance taskInstance); + + /** + * Update task instance to DB. + * @param taskInstance task instance + * @return result + */ + boolean updateTaskInstance(TaskInstance taskInstance); + + /** + * Submit a task instance to DB. + * @param taskInstance task instance + * @param processInstance process instance + * @return task instance + */ + TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance); + + /** + * Query list of valid task instance by process instance id + * @param processInstanceId processInstanceId + * @param testFlag test flag + * @return list of valid task instance + */ + List findValidTaskListByProcessId(Integer processInstanceId, int testFlag); + + /** + * find previous task list by work process id + * @param processInstanceId processInstanceId + * @return task instance list + */ + List findPreviousTaskListByWorkProcessId(Integer processInstanceId); + + /** + * find task instance by id + * @param taskId task id + * @return task instance + */ + TaskInstance findTaskInstanceById(Integer taskId); + + /** + * find task instance list by id list + * @param idList task id list + * @return task instance list + */ + List findTaskInstanceByIdList(List idList); + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java new file mode 100644 index 0000000000..94dbc075bd --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao; + +import lombok.NonNull; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +/** + * Process Instance Map Dao implementation + */ +@Repository +public class ProcessInstanceMapDaoImpl implements ProcessInstanceMapDao { + + @Autowired + private ProcessInstanceMapMapper processInstanceMapMapper; + + @Override + public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) { + return processInstanceMapMapper.updateById(processInstanceMap); + } + + @Override + public int createWorkProcessInstanceMap(@NonNull ProcessInstanceMap processInstanceMap) { + return processInstanceMapMapper.insert(processInstanceMap); + } + + @Override + public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) { + return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId); + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java new file mode 100644 index 0000000000..33b6033454 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +import com.google.common.collect.Lists; + +/** + * Task Definition DAO Implementation + */ +@Repository +public class TaskDefinitionDaoImpl implements TaskDefinitionDao { + + private final Logger logger = LoggerFactory.getLogger(TaskDefinitionDaoImpl.class); + + @Autowired + private ProcessDefinitionMapper processDefinitionMapper; + + @Autowired + private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + + @Autowired + private TaskDefinitionLogMapper taskDefinitionLogMapper; + + @Autowired + private TaskDefinitionMapper taskDefinitionMapper; + + @Override + public List getTaskDefinitionListByDefinition(long processDefinitionCode) { + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + if (processDefinition == null) { + logger.error("Cannot find process definition, code: {}", processDefinitionCode); + return Lists.newArrayList(); + } + + List processTaskRelations = processTaskRelationLogMapper + .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); + Set taskDefinitionSet = new HashSet<>(); + processTaskRelations.stream().filter(p -> p.getPostTaskCode() > 0) + .forEach(p -> taskDefinitionSet.add(new TaskDefinition(p.getPostTaskCode(), p.getPostTaskVersion()))); + + if (taskDefinitionSet.isEmpty()) { + return Lists.newArrayList(); + } + List taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet); + return Lists.newArrayList(taskDefinitionLogs); + } + + @Override + public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) { + return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion); + } + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java new file mode 100644 index 0000000000..efd000a410 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +import com.google.common.collect.Lists; + +/** + * Task Definition Log DAP implementation + */ +@Repository +public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao { + + private final Logger logger = LoggerFactory.getLogger(TaskDefinitionLogDaoImpl.class); + + @Autowired + private TaskDefinitionDao taskDefinitionDao; + + @Autowired + private TaskDefinitionLogMapper taskDefinitionLogMapper; + + @Override + public List getTaskDefineLogList(List processTaskRelations) { + Set taskDefinitionSet = new HashSet<>(); + for (ProcessTaskRelation processTaskRelation : processTaskRelations) { + if (processTaskRelation.getPreTaskCode() > 0) { + taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(), + processTaskRelation.getPreTaskVersion())); + } + if (processTaskRelation.getPostTaskCode() > 0) { + taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), + processTaskRelation.getPostTaskVersion())); + } + } + if (taskDefinitionSet.isEmpty()) { + return Lists.newArrayList(); + } + return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet); + } + + @Override + public List getTaskDefineLogListByRelation(List processTaskRelations) { + List taskDefinitionLogs = new ArrayList<>(); + Map taskCodeVersionMap = new HashMap<>(); + for (ProcessTaskRelation processTaskRelation : processTaskRelations) { + if (processTaskRelation.getPreTaskCode() > 0) { + taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion()); + } + if (processTaskRelation.getPostTaskCode() > 0) { + taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()); + } + } + taskCodeVersionMap.forEach((code, version) -> { + taskDefinitionLogs.add((TaskDefinitionLog) taskDefinitionDao.findTaskDefinition(code, version)); + }); + return taskDefinitionLogs; + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java new file mode 100644 index 0000000000..648085cbd3 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; + +import org.apache.commons.collections.CollectionUtils; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +/** + * Task Instance DAO implementation + */ +@Repository +public class TaskInstanceDaoImpl implements TaskInstanceDao { + + private final Logger logger = LoggerFactory.getLogger(TaskInstanceDaoImpl.class); + + @Autowired + private TaskInstanceMapper taskInstanceMapper; + + @Autowired + private ProcessInstanceMapper processInstanceMapper; + + @Autowired + private ProcessInstanceMapDao processInstanceMapDao; + + @Override + public boolean upsertTaskInstance(TaskInstance taskInstance) { + if (taskInstance.getId() != null) { + return updateTaskInstance(taskInstance); + } else { + return insertTaskInstance(taskInstance); + } + } + + @Override + public boolean insertTaskInstance(TaskInstance taskInstance) { + int count = taskInstanceMapper.insert(taskInstance); + return count > 0; + } + + @Override + public boolean updateTaskInstance(TaskInstance taskInstance) { + int count = taskInstanceMapper.updateById(taskInstance); + return count > 0; + } + + @Override + public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) { + WorkflowExecutionStatus processInstanceState = processInstance.getState(); + if (processInstanceState.isFinished() || processInstanceState == WorkflowExecutionStatus.READY_STOP) { + logger.warn("processInstance: {} state was: {}, skip submit this task, taskCode: {}", + processInstance.getId(), + processInstanceState, + taskInstance.getTaskCode()); + return null; + } + if (processInstanceState == WorkflowExecutionStatus.READY_PAUSE) { + taskInstance.setState(TaskExecutionStatus.PAUSE); + } + taskInstance.setExecutorId(processInstance.getExecutorId()); + taskInstance.setState(getSubmitTaskState(taskInstance, processInstance)); + if (taskInstance.getSubmitTime() == null) { + taskInstance.setSubmitTime(new Date()); + } + if (taskInstance.getFirstSubmitTime() == null) { + taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime()); + } + boolean saveResult = upsertTaskInstance(taskInstance); + if (!saveResult) { + return null; + } + return taskInstance; + } + + private TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) { + TaskExecutionStatus state = taskInstance.getState(); + if (state == TaskExecutionStatus.RUNNING_EXECUTION + || state == TaskExecutionStatus.DELAY_EXECUTION + || state == TaskExecutionStatus.KILL + || state == TaskExecutionStatus.DISPATCH) { + return state; + } + + if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) { + state = TaskExecutionStatus.PAUSE; + } else if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP + || !checkProcessStrategy(taskInstance, processInstance)) { + state = TaskExecutionStatus.KILL; + } else { + state = TaskExecutionStatus.SUBMITTED_SUCCESS; + } + return state; + } + + private boolean checkProcessStrategy(TaskInstance taskInstance, ProcessInstance processInstance) { + FailureStrategy failureStrategy = processInstance.getFailureStrategy(); + if (failureStrategy == FailureStrategy.CONTINUE) { + return true; + } + List taskInstances = + this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), taskInstance.getTestFlag()); + + for (TaskInstance task : taskInstances) { + if (task.getState() == TaskExecutionStatus.FAILURE + && task.getRetryTimes() >= task.getMaxRetryTimes()) { + return false; + } + } + return true; + } + + @Override + public List findValidTaskListByProcessId(Integer processInstanceId, int testFlag) { + return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag); + } + + @Override + public List findPreviousTaskListByWorkProcessId(Integer processInstanceId) { + ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId); + return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO, + processInstance.getTestFlag()); + } + + @Override + public TaskInstance findTaskInstanceById(Integer taskId) { + return taskInstanceMapper.selectById(taskId); + } + + @Override + public List findTaskInstanceByIdList(List idList) { + if (CollectionUtils.isEmpty(idList)) { + return new ArrayList<>(); + } + return taskInstanceMapper.selectBatchIds(idList); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 87c802b163..c12c1d62e4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand; @@ -36,7 +37,6 @@ import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; @@ -75,11 +75,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread { @Autowired private TaskPriorityQueue taskPriorityQueue; - /** - * processService - */ @Autowired - private ProcessService processService; + private TaskInstanceDao taskInstanceDao; /** * executor dispatcher @@ -264,7 +261,7 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread { * @return taskInstance is final state */ public boolean taskInstanceIsFinalState(int taskInstanceId) { - TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); + TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId); return taskInstance.getState().isFinished(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java index 51a0830561..18168056f0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.event; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TaskEventType; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; @@ -46,6 +47,9 @@ public class TaskDelayEventHandler implements TaskEventHandler { @Autowired private ProcessService processService; + @Autowired + private TaskInstanceDao taskInstanceDao; + @Autowired private WorkflowExecuteThreadPool workflowExecuteThreadPool; @@ -85,7 +89,7 @@ public class TaskDelayEventHandler implements TaskEventHandler { taskInstance.setExecutePath(taskEvent.getExecutePath()); taskInstance.setPid(taskEvent.getProcessId()); taskInstance.setAppLink(taskEvent.getAppIds()); - if (!processService.updateTaskInstance(taskInstance)) { + if (!taskInstanceDao.updateTaskInstance(taskInstance)) { throw new TaskEventHandleError("Handle task delay event error, update taskInstance to db failed"); } sendAckToWorker(taskEvent); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java index 83d11db80d..08c13f2de1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java @@ -19,12 +19,12 @@ package org.apache.dolphinscheduler.server.master.event; import org.apache.dolphinscheduler.common.enums.TaskEventType; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +40,7 @@ public class TaskDispatchEventHandler implements TaskEventHandler { private ProcessInstanceExecCacheManager processInstanceExecCacheManager; @Autowired - private ProcessService processService; + private TaskInstanceDao taskInstanceDao; @Override public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError { @@ -68,7 +68,7 @@ public class TaskDispatchEventHandler implements TaskEventHandler { taskInstance.setState(TaskExecutionStatus.DISPATCH); taskInstance.setHost(taskEvent.getWorkerAddress()); try { - if (!processService.updateTaskInstance(taskInstance)) { + if (!taskInstanceDao.updateTaskInstance(taskInstance)) { throw new TaskEventHandleError("Handle task dispatch event error, update taskInstance to db failed"); } } catch (Exception ex) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java index 5f36248d9e..a185037ae4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.event; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TaskEventType; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; @@ -50,6 +51,9 @@ public class TaskResultEventHandler implements TaskEventHandler { @Autowired private ProcessService processService; + @Autowired + private TaskInstanceDao taskInstanceDao; + @Autowired private MasterConfig masterConfig; @@ -92,7 +96,7 @@ public class TaskResultEventHandler implements TaskEventHandler { taskInstance.setEndTime(taskEvent.getEndTime()); taskInstance.setVarPool(taskEvent.getVarPool()); processService.changeOutParam(taskInstance); - processService.updateTaskInstance(taskInstance); + taskInstanceDao.updateTaskInstance(taskInstance); sendAckToWorker(taskEvent); } catch (Exception ex) { TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java index 3dc86b3a37..85a10da40d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java @@ -20,13 +20,13 @@ package org.apache.dolphinscheduler.server.master.event; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TaskEventType; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; -import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.Optional; @@ -47,7 +47,7 @@ public class TaskRunningEventHandler implements TaskEventHandler { private WorkflowExecuteThreadPool workflowExecuteThreadPool; @Autowired - private ProcessService processService; + private TaskInstanceDao taskInstanceDao; @Override public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError { @@ -84,7 +84,7 @@ public class TaskRunningEventHandler implements TaskEventHandler { taskInstance.setExecutePath(taskEvent.getExecutePath()); taskInstance.setPid(taskEvent.getProcessId()); taskInstance.setAppLink(taskEvent.getAppIds()); - if (!processService.updateTaskInstance(taskInstance)) { + if (!taskInstanceDao.updateTaskInstance(taskInstance)) { throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed"); } sendAckToWorker(taskEvent); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java index c1520b07df..cf80bfbd19 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java @@ -19,13 +19,13 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteStartCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.master.runner.StreamTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.StreamTaskExecuteThreadPool; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +47,7 @@ public class TaskExecuteStartProcessor implements NettyRequestProcessor { private StreamTaskExecuteThreadPool streamTaskExecuteThreadPool; @Autowired - private ProcessService processService; + private TaskDefinitionDao taskDefinitionDao; @Override public void process(Channel channel, Command command) { @@ -57,7 +57,7 @@ public class TaskExecuteStartProcessor implements NettyRequestProcessor { JSONUtils.parseObject(command.getBody(), TaskExecuteStartCommand.class); logger.info("taskExecuteStartCommand: {}", taskExecuteStartCommand); - TaskDefinition taskDefinition = processService.findTaskDefinition( + TaskDefinition taskDefinition = taskDefinitionDao.findTaskDefinition( taskExecuteStartCommand.getTaskDefinitionCode(), taskExecuteStartCommand.getTaskDefinitionVersion()); if (taskDefinition == null) { logger.error("Task definition can not be found, taskDefinitionCode:{}, taskDefinitionVersion:{}", diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index 7debb1f895..236dde1760 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; @@ -73,6 +75,12 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl @Autowired private ProcessInstanceDao processInstanceDao; + @Autowired + private TaskInstanceDao taskInstanceDao; + + @Autowired + private TaskDefinitionLogDao taskDefinitionLogDao; + @Autowired private MasterConfig masterConfig; @@ -183,7 +191,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl processAlertManager, masterConfig, stateWheelExecuteThread, - curingGlobalParamsService); + curingGlobalParamsService, + taskInstanceDao, + taskDefinitionLogDao); processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable); workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java index 15abef270d..ddecc35043 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -89,6 +90,8 @@ public class StreamTaskExecuteRunnable implements Runnable { protected ProcessService processService; + protected TaskInstanceDao taskInstanceDao; + protected ExecutorDispatcher dispatcher; protected ProcessTaskRelationMapper processTaskRelationMapper; @@ -118,6 +121,7 @@ public class StreamTaskExecuteRunnable implements Runnable { this.dispatcher = SpringApplicationContext.getBean(ExecutorDispatcher.class); this.taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class); this.processTaskRelationMapper = SpringApplicationContext.getBean(ProcessTaskRelationMapper.class); + this.taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class); this.streamTaskInstanceExecCacheManager = SpringApplicationContext.getBean(StreamTaskInstanceExecCacheManager.class); this.taskDefinition = taskDefinition; @@ -133,7 +137,7 @@ public class StreamTaskExecuteRunnable implements Runnable { // submit task processService.updateTaskDefinitionResources(taskDefinition); taskInstance = newTaskInstance(taskDefinition); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); // add cache streamTaskInstanceExecCacheManager.cache(taskInstance.getId(), this); @@ -148,7 +152,7 @@ public class StreamTaskExecuteRunnable implements Runnable { TaskExecutionContext taskExecutionContext = getTaskExecutionContext(taskInstance); if (taskExecutionContext == null) { taskInstance.setState(TaskExecutionStatus.FAILURE); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); return; } @@ -175,7 +179,7 @@ public class StreamTaskExecuteRunnable implements Runnable { // set task instance fail taskInstance.setState(TaskExecutionStatus.FAILURE); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); return; } @@ -416,7 +420,7 @@ public class StreamTaskExecuteRunnable implements Runnable { taskInstance.setEndTime(taskEvent.getEndTime()); taskInstance.setVarPool(taskEvent.getVarPool()); processService.changeOutParam(taskInstance); - processService.updateTaskInstance(taskInstance); + taskInstanceDao.updateTaskInstance(taskInstance); // send ack sendAckToWorker(taskEvent); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 175416949f..6bca587ac0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -58,6 +58,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -134,6 +136,10 @@ public class WorkflowExecuteRunnable implements Callable { private ProcessInstanceDao processInstanceDao; + private TaskInstanceDao taskInstanceDao; + + private TaskDefinitionLogDao taskDefinitionLogDao; + private final ProcessAlertManager processAlertManager; private final NettyExecutorManager nettyExecutorManager; @@ -245,7 +251,9 @@ public class WorkflowExecuteRunnable implements Callable { @NonNull ProcessAlertManager processAlertManager, @NonNull MasterConfig masterConfig, @NonNull StateWheelExecuteThread stateWheelExecuteThread, - @NonNull CuringParamsService curingParamsService) { + @NonNull CuringParamsService curingParamsService, + @NonNull TaskInstanceDao taskInstanceDao, + @NonNull TaskDefinitionLogDao taskDefinitionLogDao) { this.processService = processService; this.commandService = commandService; this.processInstanceDao = processInstanceDao; @@ -254,6 +262,8 @@ public class WorkflowExecuteRunnable implements Callable { this.processAlertManager = processAlertManager; this.stateWheelExecuteThread = stateWheelExecuteThread; this.curingParamsService = curingParamsService; + this.taskInstanceDao = taskInstanceDao; + this.taskDefinitionLogDao = taskDefinitionLogDao; this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort()); TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size); } @@ -347,7 +357,7 @@ public class WorkflowExecuteRunnable implements Callable { public boolean checkForceStartAndWakeUp(StateEvent stateEvent) { TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId()); if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) { - TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); + TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId()); ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); taskProcessor.action(TaskAction.DISPATCH); this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), @@ -357,7 +367,7 @@ public class WorkflowExecuteRunnable implements Callable { if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue); if (acquireTaskGroup) { - TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); + TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId()); ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); taskProcessor.action(TaskAction.DISPATCH); return true; @@ -502,7 +512,7 @@ public class WorkflowExecuteRunnable implements Callable { */ public void refreshTaskInstance(int taskInstanceId) { logger.info("task instance update: {} ", taskInstanceId); - TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); + TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId); if (taskInstance == null) { logger.error("can not find task instance, id:{}", taskInstanceId); return; @@ -775,7 +785,7 @@ public class WorkflowExecuteRunnable implements Callable { List processTaskRelations = processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); List taskDefinitionLogs = - processService.getTaskDefineLogListByRelation(processTaskRelations); + taskDefinitionLogDao.getTaskDefineLogListByRelation(processTaskRelations); List taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); forbiddenTaskMap.clear(); @@ -815,7 +825,8 @@ public class WorkflowExecuteRunnable implements Callable { processInstance.getRunTimes(), processInstance.getRecovery()); List validTaskInstanceList = - processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()); + taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), + processInstance.getTestFlag()); for (TaskInstance task : validTaskInstanceList) { try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId()); @@ -831,7 +842,7 @@ public class WorkflowExecuteRunnable implements Callable { TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); if (!oldTaskInstance.getState().isFinished() && task.getState().isFinished()) { task.setFlag(Flag.NO); - processService.updateTaskInstance(task); + taskInstanceDao.updateTaskInstance(task); continue; } } @@ -852,7 +863,7 @@ public class WorkflowExecuteRunnable implements Callable { if (task.getState().isNeedFaultTolerance()) { logger.info("TaskInstance needs fault tolerance, will be added to standby list."); task.setFlag(Flag.NO); - processService.updateTaskInstance(task); + taskInstanceDao.updateTaskInstance(task); // tolerantTaskInstance add to standby list directly TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task); @@ -953,7 +964,7 @@ public class WorkflowExecuteRunnable implements Callable { if (taskInstance.getId() != oldTaskInstanceId) { TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); oldTaskInstance.setFlag(Flag.NO); - processService.updateTaskInstance(oldTaskInstance); + taskInstanceDao.updateTaskInstance(oldTaskInstance); validTaskMap.remove(taskInstance.getTaskCode()); activeTaskProcessorMaps.remove(taskInstance.getTaskCode()); } @@ -1780,7 +1791,7 @@ public class WorkflowExecuteRunnable implements Callable { if (taskInstanceId == null || taskInstanceId.equals(0)) { continue; } - TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); + TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId); if (taskInstance == null || taskInstance.getState().isFinished()) { continue; } @@ -1813,7 +1824,7 @@ public class WorkflowExecuteRunnable implements Callable { } // stop tasks which is retrying if forced success happens if (task.taskCanRetry()) { - TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); + TaskInstance retryTask = taskInstanceDao.findTaskInstanceById(task.getId()); if (retryTask != null && retryTask.getState().isForceSuccess()) { task.setState(retryTask.getState()); logger.info( @@ -1888,7 +1899,7 @@ public class WorkflowExecuteRunnable implements Callable { .map(Integer::valueOf) .collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(startTaskInstanceIds)) { - return processService.findTaskInstanceByIdList(startTaskInstanceIds); + return taskInstanceDao.findTaskInstanceByIdList(startTaskInstanceIds); } } return Collections.emptyList(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 029ae7471a..ff39833142 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -49,6 +49,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; @@ -123,6 +124,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected ProcessService processService; + protected TaskInstanceDao taskInstanceDao; + protected ProcessInstanceDao processInstanceDao; protected MasterConfig masterConfig; @@ -140,6 +143,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { masterConfig = SpringApplicationContext.getBean(MasterConfig.class); taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class); curingParamsService = SpringApplicationContext.getBean(CuringParamsService.class); + taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class); this.taskInstance = taskInstance; this.processInstance = processInstance; this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes(); @@ -306,7 +310,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { if (verifyTenantIsNull(tenant, taskInstance)) { logger.info("Task state changes to {}", TaskExecutionStatus.FAILURE); taskInstance.setState(TaskExecutionStatus.FAILURE); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); return null; } // set queue for process instance, user-specified queue takes precedence over tenant queue diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java index f7eac540d9..0b4848b1df 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java @@ -77,7 +77,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor { this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); this.taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); this.taskInstance.setStartTime(new Date()); - this.processService.saveTaskInstance(taskInstance); + this.taskInstanceDao.upsertTaskInstance(taskInstance); this.dependentParameters = taskInstance.getDependency(); this.blockingParam = JSONUtils.parseObject(taskInstance.getTaskParams(), BlockingParameters.class); } @@ -87,7 +87,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor { // todo: task cannot be pause taskInstance.setState(TaskExecutionStatus.PAUSE); taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); logger.info("blocking task has been paused"); return true; } @@ -96,7 +96,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor { protected boolean killTask() { taskInstance.setState(TaskExecutionStatus.KILL); taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); logger.info("blocking task has been killed"); return true; } @@ -171,7 +171,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor { private void setConditionResult() { - List taskInstances = processService + List taskInstances = taskInstanceDao .findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), processInstance.getTestFlag()); for (TaskInstance task : taskInstances) { completeTaskList.putIfAbsent(task.getTaskCode(), task.getState()); @@ -204,7 +204,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor { } taskInstance.setState(TaskExecutionStatus.SUCCESS); taskInstance.setEndTime(new Date()); - processService.updateTaskInstance(taskInstance); + taskInstanceDao.updateTaskInstance(taskInstance); logger.info("blocking task execute complete, blocking:{}", isBlocked); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index aa57548a06..cddc86ddaf 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -146,7 +146,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { public boolean killTask() { try { - taskInstance = processService.findTaskInstanceById(taskInstance.getId()); + taskInstance = taskInstanceDao.findTaskInstanceById(taskInstance.getId()); if (taskInstance == null) { return true; } @@ -156,7 +156,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { // we don't wait the kill response taskInstance.setState(TaskExecutionStatus.KILL); taskInstance.setEndTime(new Date()); - processService.updateTaskInstance(taskInstance); + taskInstanceDao.updateTaskInstance(taskInstance); if (StringUtils.isNotEmpty(taskInstance.getHost())) { killRemoteTask(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index f04e950223..23e0e65071 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -99,7 +99,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { protected boolean pauseTask() { this.taskInstance.setState(TaskExecutionStatus.PAUSE); this.taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); return true; } @@ -120,7 +120,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { protected boolean killTask() { this.taskInstance.setState(TaskExecutionStatus.KILL); this.taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); return true; } @@ -138,13 +138,13 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); - this.processService.saveTaskInstance(taskInstance); + this.taskInstanceDao.upsertTaskInstance(taskInstance); this.dependentParameters = taskInstance.getDependency(); } private void setConditionResult() { - List taskInstances = processService + List taskInstances = taskInstanceDao .findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), processInstance.getTestFlag()); for (TaskInstance task : taskInstances) { completeTaskList.putIfAbsent(task.getTaskCode(), task.getState()); @@ -194,6 +194,6 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { (conditionResult == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE; taskInstance.setState(status); taskInstance.setEndTime(new Date()); - processService.updateTaskInstance(taskInstance); + taskInstanceDao.updateTaskInstance(taskInstance); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index ddcdc33c58..2edd0e97ea 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -114,7 +114,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); - processService.updateTaskInstance(taskInstance); + taskInstanceDao.updateTaskInstance(taskInstance); initDependParameters(); logger.info("Success initialize dependent task parameters, the dependent data is: {}", dependentDate); return true; @@ -231,7 +231,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { protected boolean pauseTask() { this.taskInstance.setState(TaskExecutionStatus.PAUSE); this.taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); return true; } @@ -239,7 +239,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { protected boolean killTask() { this.taskInstance.setState(TaskExecutionStatus.KILL); this.taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); return true; } @@ -290,7 +290,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { status = (result == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE; taskInstance.setState(status); taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index fe1896ec85..1828e269ea 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -133,7 +133,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { taskInstance.setState(TaskExecutionStatus.of(subProcessInstance.getState().getCode())); taskInstance.setEndTime(new Date()); dealFinish(); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); } } @@ -201,7 +201,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); - processService.updateTaskInstance(taskInstance); + taskInstanceDao.updateTaskInstance(taskInstance); logger.info("set sub work flow {} task {} state: {}", processInstance.getId(), taskInstance.getId(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index de571cb657..dfd5a02fb7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -80,7 +80,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); - processService.updateTaskInstance(taskInstance); + taskInstanceDao.updateTaskInstance(taskInstance); if (!this.taskInstance().getState().isFinished()) { setSwitchResult(); @@ -104,7 +104,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { protected boolean pauseTask() { this.taskInstance.setState(TaskExecutionStatus.PAUSE); this.taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); return true; } @@ -112,7 +112,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { protected boolean killTask() { this.taskInstance.setState(TaskExecutionStatus.KILL); this.taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); return true; } @@ -127,7 +127,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { } private boolean setSwitchResult() { - List taskInstances = processService.findValidTaskListByProcessId( + List taskInstances = taskInstanceDao.findValidTaskListByProcessId( taskInstance.getProcessInstanceId(), processInstance.getTestFlag()); Map completeTaskList = new HashMap<>(); for (TaskInstance task : taskInstances) { @@ -188,7 +188,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { (conditionResult == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE; taskInstance.setEndTime(new Date()); taskInstance.setState(status); - processService.updateTaskInstance(taskInstance); + taskInstanceDao.updateTaskInstance(taskInstance); } public String setTaskParams(String content, String rgex) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java index cf28a6bc8e..5d6048c208 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; @@ -75,12 +76,15 @@ public class MasterFailoverService { private final LogClient logClient; + private final TaskInstanceDao taskInstanceDao; + public MasterFailoverService(@NonNull RegistryClient registryClient, @NonNull MasterConfig masterConfig, @NonNull ProcessService processService, @NonNull NettyExecutorManager nettyExecutorManager, @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager, - @NonNull LogClient logClient) { + @NonNull LogClient logClient, + @NonNull TaskInstanceDao taskInstanceDao) { this.registryClient = registryClient; this.masterConfig = masterConfig; this.processService = processService; @@ -88,7 +92,7 @@ public class MasterFailoverService { this.localAddress = masterConfig.getMasterAddress(); this.processInstanceExecCacheManager = processInstanceExecCacheManager; this.logClient = logClient; - + this.taskInstanceDao = taskInstanceDao; } /** @@ -164,7 +168,7 @@ public class MasterFailoverService { processInstance.setProcessDefinition(processDefinition); int processInstanceId = processInstance.getId(); List taskInstanceList = - processService.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag()); + taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag()); for (TaskInstance taskInstance : taskInstanceList) { try { LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId()); @@ -249,7 +253,7 @@ public class MasterFailoverService { } taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); } private void sendKillCommandToWorker(@NonNull TaskInstance taskInstance) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java index 18de752b19..30522b8e4a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder; @@ -73,12 +74,15 @@ public class WorkerFailoverService { private final LogClient logClient; private final String localAddress; + private final TaskInstanceDao taskInstanceDao; + public WorkerFailoverService(@NonNull RegistryClient registryClient, @NonNull MasterConfig masterConfig, @NonNull ProcessService processService, @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool, @NonNull ProcessInstanceExecCacheManager cacheManager, - @NonNull LogClient logClient) { + @NonNull LogClient logClient, + @NonNull TaskInstanceDao taskInstanceDao) { this.registryClient = registryClient; this.masterConfig = masterConfig; this.processService = processService; @@ -86,6 +90,7 @@ public class WorkerFailoverService { this.cacheManager = cacheManager; this.logClient = logClient; this.localAddress = masterConfig.getMasterAddress(); + this.taskInstanceDao = taskInstanceDao; } /** @@ -183,7 +188,7 @@ public class WorkerFailoverService { taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE); taskInstance.setFlag(Flag.NO); - processService.saveTaskInstance(taskInstance); + taskInstanceDao.upsertTaskInstance(taskInstance); TaskStateEvent stateEvent = TaskStateEvent.builder() .processInstanceId(processInstance.getId()) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java index 3932dd693f..a28ee36cb0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.utils; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -48,6 +49,8 @@ public class DependentExecute { */ private final ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); + private final TaskInstanceDao taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class); + /** * depend item list */ @@ -154,7 +157,7 @@ public class DependentExecute { DependResult result; TaskInstance taskInstance = null; List taskInstanceList = - processService.findValidTaskListByProcessId(processInstance.getId(), testFlag); + taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), testFlag); for (TaskInstance task : taskInstanceList) { if (task.getTaskCode() == taskCode) { diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java index 1429ec1567..c36f38d8f2 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java @@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; @@ -62,6 +64,10 @@ public class BlockingTaskTest { private ProcessService processService; + private TaskInstanceDao taskInstanceDao; + + private TaskDefinitionDao taskDefinitionDao; + private ProcessInstance processInstance; private MasterConfig config; @@ -80,9 +86,14 @@ public class BlockingTaskTest { // mock process service processService = Mockito.mock(ProcessService.class); - Mockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService); + taskInstanceDao = Mockito.mock(TaskInstanceDao.class); + Mockito.when(SpringApplicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao); + + taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class); + Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao); + // mock process instance processInstance = getProcessInstance(); Mockito.when(processService @@ -93,7 +104,7 @@ public class BlockingTaskTest { taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN); taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN); taskDefinition.setTimeout(0); - Mockito.when(processService.findTaskDefinition(1L, 1)) + Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1)) .thenReturn(taskDefinition); } @@ -187,24 +198,23 @@ public class BlockingTaskTest { .submitTask(processInstance, taskInstance)) .thenReturn(taskInstance); - Mockito.when(processService + Mockito.when(taskInstanceDao .findTaskInstanceById(taskInstance.getId())) .thenReturn(taskInstance); // for BlockingTaskExecThread.initTaskParameters - Mockito.when(processService - .saveTaskInstance(taskInstance)) + Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance)) .thenReturn(true); // for BlockingTaskExecThread.updateTaskState - Mockito.when(processService + Mockito.when(taskInstanceDao .updateTaskInstance(taskInstance)) .thenReturn(true); // for BlockingTaskExecThread.waitTaskQuit List conditions = getTaskInstanceForValidTaskList(expectResults); Mockito.when( - processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag())) + taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag())) .thenReturn(conditions); return taskInstance; } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java index 035a811f76..ae8b6022a0 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java @@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; @@ -61,6 +63,10 @@ public class ConditionsTaskTest { private ProcessInstance processInstance; + private TaskInstanceDao taskInstanceDao; + + private TaskDefinitionDao taskDefinitionDao; + @BeforeEach public void before() { ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); @@ -75,6 +81,12 @@ public class ConditionsTaskTest { processService = Mockito.mock(ProcessService.class); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + taskInstanceDao = Mockito.mock(TaskInstanceDao.class); + Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao); + + taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class); + Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao); + processInstance = getProcessInstance(); Mockito.when(processService .findProcessInstanceById(processInstance.getId())) @@ -84,7 +96,7 @@ public class ConditionsTaskTest { taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN); taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN); taskDefinition.setTimeout(0); - Mockito.when(processService.findTaskDefinition(1L, 1)) + Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1)) .thenReturn(taskDefinition); } @@ -96,22 +108,21 @@ public class ConditionsTaskTest { .submitTask(processInstance, taskInstance)) .thenReturn(taskInstance); // for MasterBaseTaskExecThread.call - Mockito.when(processService + Mockito.when(taskInstanceDao .findTaskInstanceById(taskInstance.getId())) .thenReturn(taskInstance); // for ConditionsTaskExecThread.initTaskParameters - Mockito.when(processService - .saveTaskInstance(taskInstance)) + Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance)) .thenReturn(true); // for ConditionsTaskExecThread.updateTaskState - Mockito.when(processService + Mockito.when(taskInstanceDao .updateTaskInstance(taskInstance)) .thenReturn(true); // for ConditionsTaskExecThread.waitTaskQuit List conditions = Stream.of( getTaskInstanceForValidTaskList(expectResult)).collect(Collectors.toList()); - Mockito.when(processService + Mockito.when(taskInstanceDao .findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag())) .thenReturn(conditions); return taskInstance; diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index e20e7c5572..0cfed4545c 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -70,6 +72,10 @@ public class DependentTaskTest { private ProcessService processService; + private TaskInstanceDao taskInstanceDao; + + private TaskDefinitionDao taskDefinitionDao; + /** * the dependent task to be tested * ProcessDefinition id=1 @@ -95,6 +101,12 @@ public class DependentTaskTest { processService = Mockito.mock(ProcessService.class); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + taskInstanceDao = Mockito.mock(TaskInstanceDao.class); + Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao); + + taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class); + Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao); + processInstance = getProcessInstance(); taskInstance = getTaskInstance(); @@ -110,20 +122,19 @@ public class DependentTaskTest { .thenAnswer(i -> taskInstance); // for DependentTaskExecThread.initTaskParameters - Mockito.when(processService + Mockito.when(taskInstanceDao .updateTaskInstance(Mockito.any())) .thenReturn(true); // for DependentTaskExecThread.updateTaskState - Mockito.when(processService - .saveTaskInstance(Mockito.any())) + Mockito.when(taskInstanceDao.upsertTaskInstance(Mockito.any())) .thenReturn(true); // for DependentTaskExecThread.waitTaskQuit - Mockito.when(processService + Mockito.when(taskInstanceDao .findTaskInstanceById(1000)) .thenAnswer(i -> taskInstance); - Mockito.when(processService.findTaskDefinition(TASK_CODE, TASK_VERSION)) + Mockito.when(taskDefinitionDao.findTaskDefinition(TASK_CODE, TASK_VERSION)) .thenReturn(getTaskDefinition()); } @@ -155,7 +166,7 @@ public class DependentTaskTest { .thenReturn(dependentProcessInstance); // for DependentExecute.getDependTaskResult - Mockito.when(processService + Mockito.when(taskInstanceDao .findValidTaskListByProcessId(200, 0)) .thenReturn(Stream.of( getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.SUCCESS, DEPEND_TASK_CODE_A, @@ -177,7 +188,7 @@ public class DependentTaskTest { .thenReturn(dependentProcessInstance); // for DependentExecute.getDependTaskResult - Mockito.when(processService + Mockito.when(taskInstanceDao .findValidTaskListByProcessId(200, 0)) .thenReturn(Stream.of( getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.FAILURE, DEPEND_TASK_CODE_A, @@ -228,13 +239,13 @@ public class DependentTaskTest { .thenReturn(processInstance300); // for DependentExecute.getDependTaskResult - Mockito.when(processService + Mockito.when(taskInstanceDao .findValidTaskListByProcessId(200, 0)) .thenReturn(Stream.of( getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.FAILURE, DEPEND_TASK_CODE_A, processInstance200)) .collect(Collectors.toList())); - Mockito.when(processService + Mockito.when(taskInstanceDao .findValidTaskListByProcessId(300, 0)) .thenReturn(Stream.of( getTaskInstanceForValidTaskList(3000, TaskExecutionStatus.SUCCESS, DEPEND_TASK_CODE_B, @@ -321,7 +332,7 @@ public class DependentTaskTest { // DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); // for DependentExecute.getDependTaskResult - Mockito.when(processService + Mockito.when(taskInstanceDao .findValidTaskListByProcessId(200, 0)) .thenAnswer(i -> { processInstance.setState(WorkflowExecutionStatus.READY_STOP); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java index 5694988847..4c2fafd63c 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; @@ -55,6 +56,8 @@ public class SubProcessTaskTest { private ProcessService processService; + private TaskInstanceDao taskInstanceDao; + private ProcessInstance processInstance; private MockedStatic mockedStaticServerLifeCycleManager; @@ -72,11 +75,14 @@ public class SubProcessTaskTest { processService = Mockito.mock(ProcessService.class); Mockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService); + taskInstanceDao = Mockito.mock(TaskInstanceDao.class); + Mockito.when(SpringApplicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao); + mockedStaticServerLifeCycleManager = Mockito.mockStatic(ServerLifeCycleManager.class); Mockito.when(ServerLifeCycleManager.isStopped()).thenReturn(false); processInstance = getProcessInstance(); - Mockito.when(processService + Mockito.when(taskInstanceDao .updateTaskInstance(Mockito.any())) .thenReturn(true); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java index f5cd14decd..9bc05d283d 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java @@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; @@ -53,6 +55,10 @@ public class SwitchTaskTest { private ProcessInstance processInstance; + private TaskInstanceDao taskInstanceDao; + + private TaskDefinitionDao taskDefinitionDao; + @BeforeEach public void before() { ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); @@ -67,6 +73,12 @@ public class SwitchTaskTest { processService = Mockito.mock(ProcessService.class); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + taskInstanceDao = Mockito.mock(TaskInstanceDao.class); + Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao); + + taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class); + Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao); + processInstance = getProcessInstance(); Mockito.when(processService .findProcessInstanceById(processInstance.getId())) @@ -78,7 +90,7 @@ public class SwitchTaskTest { taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN); taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN); taskDefinition.setTimeout(0); - Mockito.when(processService.findTaskDefinition(1L, 1)) + Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1)) .thenReturn(taskDefinition); TaskInstance taskInstance = getTaskInstance(getTaskNode(), processInstance); @@ -87,15 +99,14 @@ public class SwitchTaskTest { .submitTask(processInstance, taskInstance)) .thenReturn(taskInstance); // for MasterBaseTaskExecThread.call - Mockito.when(processService + Mockito.when(taskInstanceDao .findTaskInstanceById(taskInstance.getId())) .thenReturn(taskInstance); // for SwitchTaskExecThread.initTaskParameters - Mockito.when(processService - .saveTaskInstance(taskInstance)) + Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance)) .thenReturn(true); // for SwitchTaskExecThread.updateTaskState - Mockito.when(processService + Mockito.when(taskInstanceDao .updateTaskInstance(taskInstance)) .thenReturn(true); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index bbd4a321cd..0af8f73e7b 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -60,6 +61,9 @@ public class TaskPriorityQueueConsumerTest { @Autowired private ProcessService processService; + @Autowired + private TaskInstanceDao taskInstanceDao; + @Autowired private ExecutorDispatcher dispatcher; @@ -236,7 +240,7 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setWorkerGroup("default"); taskInstance.setExecutorId(2); - Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); + Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1); Boolean state = taskPriorityQueueConsumer.taskInstanceIsFinalState(1); Assertions.assertNotNull(state); @@ -264,7 +268,7 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setUserId(2); taskInstance.setProcessDefine(processDefinition); - Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); + Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup"); taskPriorityQueue.put(taskPriority); @@ -302,7 +306,7 @@ public class TaskPriorityQueueConsumerTest { taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN); taskInstance.setTaskDefine(taskDefinition); - Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); + Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1); TaskPriority taskPriority = new TaskPriority(); taskPriority.setTaskId(1); @@ -333,7 +337,7 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setUserId(2); taskInstance.setProcessDefine(processDefinition); - Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); + Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup"); taskPriorityQueue.put(taskPriority); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index 4cf5900f99..f76109325c 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -31,6 +31,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; @@ -71,6 +73,9 @@ public class WorkflowExecuteRunnableTest { private ProcessInstance processInstance; + private TaskInstanceDao taskInstanceDao; + + private TaskDefinitionLogDao taskDefinitionLogDao; private ProcessService processService; private CommandService commandService; @@ -96,6 +101,8 @@ public class WorkflowExecuteRunnableTest { commandService = Mockito.mock(CommandService.class); processInstanceDao = Mockito.mock(ProcessInstanceDao.class); processInstance = Mockito.mock(ProcessInstance.class); + taskInstanceDao = Mockito.mock(TaskInstanceDao.class); + taskDefinitionLogDao = Mockito.mock(TaskDefinitionLogDao.class); Map cmdParam = new HashMap<>(); cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00"); cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, "2020-01-20 23:00:00"); @@ -111,7 +118,8 @@ public class WorkflowExecuteRunnableTest { workflowExecuteThread = Mockito.spy( new WorkflowExecuteRunnable(processInstance, commandService, processService, processInstanceDao, nettyExecutorManager, - processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService)); + processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService, + taskInstanceDao, taskDefinitionLogDao)); Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); dag.setAccessible(true); dag.set(workflowExecuteThread, new DAG()); @@ -146,7 +154,7 @@ public class WorkflowExecuteRunnableTest { taskInstance4.setId(4); Map cmdParam = new HashMap<>(); cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4"); - Mockito.when(processService.findTaskInstanceByIdList( + Mockito.when(taskInstanceDao.findTaskInstanceByIdList( Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))) .thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4)); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java index 0217748fa4..0f96a98f85 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java @@ -28,6 +28,8 @@ import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; @@ -56,6 +58,10 @@ public class CommonTaskProcessorTest { private ProcessService processService; + private TaskInstanceDao taskInstanceDao; + + private TaskDefinitionDao taskDefinitionDao; + private CommonTaskProcessor commonTaskProcessor; @BeforeEach @@ -69,6 +75,12 @@ public class CommonTaskProcessorTest { processService = Mockito.mock(ProcessService.class); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + taskInstanceDao = Mockito.mock(TaskInstanceDao.class); + Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao); + + taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class); + Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao); + commonTaskProcessor = Mockito.mock(CommonTaskProcessor.class); Mockito.when(applicationContext.getBean(CommonTaskProcessor.class)).thenReturn(commonTaskProcessor); @@ -76,7 +88,7 @@ public class CommonTaskProcessorTest { taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN); taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN); taskDefinition.setTimeout(0); - Mockito.when(processService.findTaskDefinition(1L, 1)) + Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1)) .thenReturn(taskDefinition); } @@ -107,7 +119,7 @@ public class CommonTaskProcessorTest { TaskDefinition taskDefinition = new TaskDefinition(); taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN); taskInstance.setTaskDefine(taskDefinition); - Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); + Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1); TaskExecutionContext taskExecutionContext = commonTaskProcessor.getTaskExecutionContext(taskInstance); Assertions.assertNull(taskExecutionContext); } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java index f7b41467bd..2e9dc23936 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -73,6 +74,9 @@ public class FailoverServiceTest { @Mock private ProcessService processService; + @Mock + private TaskInstanceDao taskInstanceDao; + @Mock private WorkflowExecuteThreadPool workflowExecuteThreadPool; @@ -108,13 +112,14 @@ public class FailoverServiceTest { given(masterConfig.getMasterAddress()).willReturn(testMasterHost); MasterFailoverService masterFailoverService = new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager, - processInstanceExecCacheManager, logClient); + processInstanceExecCacheManager, logClient, taskInstanceDao); WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient, masterConfig, processService, workflowExecuteThreadPool, cacheManager, - logClient); + logClient, + taskInstanceDao); failoverService = new FailoverService(masterFailoverService, workerFailoverService); @@ -150,7 +155,7 @@ public class FailoverServiceTest { given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())) .willReturn(Arrays.asList(processInstance)); doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class)); - given(processService.findValidTaskListByProcessId(Mockito.anyInt(), Mockito.anyInt())) + given(taskInstanceDao.findValidTaskListByProcessId(Mockito.anyInt(), Mockito.anyInt())) .willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance)); Thread.sleep(1000); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 6f85ed3e0b..76a6e6ea52 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.dao.entity.Environment; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; @@ -70,8 +69,6 @@ public interface ProcessService { Optional findProcessInstanceDetailById(int processId); - List getTaskNodeListByDefinition(long defineCode); - ProcessInstance findProcessInstanceById(int processId); ProcessDefinition findProcessDefineById(int processDefinitionId); @@ -104,36 +101,12 @@ public interface ProcessService { void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task); - TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance); - - TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance); - - boolean saveTaskInstance(TaskInstance taskInstance); - - boolean createTaskInstance(TaskInstance taskInstance); - - boolean updateTaskInstance(TaskInstance taskInstance); - - TaskInstance findTaskInstanceById(Integer taskId); - - List findTaskInstanceByIdList(List idList); - void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance); void updateTaskDefinitionResources(TaskDefinition taskDefinition); List findTaskIdByInstanceState(int instanceId, TaskExecutionStatus state); - List findValidTaskListByProcessId(Integer processInstanceId, int testFlag); - - List findPreviousTaskListByWorkProcessId(Integer processInstanceId); - - int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap); - - int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap); - - ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId); - int deleteWorkProcessMapByParentId(int parentWorkProcessId); ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId); @@ -157,8 +130,6 @@ public interface ProcessService { @Transactional void processNeedFailoverProcessInstances(ProcessInstance processInstance); - List queryNeedFailoverTaskInstances(String host); - DataSource findDataSourceById(int id); ProcessInstance findProcessInstanceByTaskId(int taskId); @@ -216,12 +187,6 @@ public interface ProcessService { DagData genDagData(ProcessDefinition processDefinition); - List genTaskDefineList(List processTaskRelations); - - List getTaskDefineLogListByRelation(List processTaskRelations); - - TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion); - List findRelationByCode(long processDefinitionCode, int processDefinitionVersion); List transformTask(List taskRelationList, diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index dd65097585..ea3f5fd7a1 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.common.constants.CommandKeyConstants; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.TaskDependType; @@ -110,6 +109,10 @@ import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.DqRuleUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -193,6 +196,18 @@ public class ProcessServiceImpl implements ProcessService { @Autowired private ProcessInstanceDao processInstanceDao; + @Autowired + private TaskDefinitionDao taskDefinitionDao; + + @Autowired + private TaskInstanceDao taskInstanceDao; + + @Autowired + private TaskDefinitionLogDao taskDefinitionLogDao; + + @Autowired + private ProcessInstanceMapDao processInstanceMapDao; + @Autowired private DataSourceMapper dataSourceMapper; @@ -399,32 +414,6 @@ public class ProcessServiceImpl implements ProcessService { return Optional.ofNullable(processInstanceMapper.queryDetailById(processId)); } - /** - * get task node list by definitionId - */ - @Override - public List getTaskNodeListByDefinition(long defineCode) { - ProcessDefinition processDefinition = processDefineMapper.queryByCode(defineCode); - if (processDefinition == null) { - logger.error("process define not exists"); - return Lists.newArrayList(); - } - List processTaskRelations = processTaskRelationLogMapper - .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); - Set taskDefinitionSet = new HashSet<>(); - for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) { - if (processTaskRelation.getPostTaskCode() > 0) { - taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), - processTaskRelation.getPostTaskVersion())); - } - } - if (taskDefinitionSet.isEmpty()) { - return Lists.newArrayList(); - } - List taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet); - return Lists.newArrayList(taskDefinitionLogs); - } - /** * find process instance by id * @@ -516,7 +505,7 @@ public class ProcessServiceImpl implements ProcessService { public void removeTaskLogFile(Integer processInstanceId) { ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId); List taskInstanceList = - findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag()); + taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag()); if (CollectionUtils.isEmpty(taskInstanceList)) { return; } @@ -538,7 +527,7 @@ public class ProcessServiceImpl implements ProcessService { public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) { ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId); List taskInstanceList = - findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag()); + taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag()); if (CollectionUtils.isEmpty(taskInstanceList)) { return; } @@ -560,7 +549,7 @@ public class ProcessServiceImpl implements ProcessService { */ @Override public void recurseFindSubProcess(long parentCode, List ids) { - List taskNodeList = this.getTaskNodeListByDefinition(parentCode); + List taskNodeList = taskDefinitionDao.getTaskDefinitionListByDefinition(parentCode); if (taskNodeList != null && !taskNodeList.isEmpty()) { @@ -862,7 +851,7 @@ public class ProcessServiceImpl implements ProcessService { failedList.addAll(killedList); failedList.addAll(toleranceList); for (Integer taskId : failedList) { - initTaskInstance(this.findTaskInstanceById(taskId)); + initTaskInstance(taskInstanceDao.findTaskInstanceById(taskId)); } cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING, String.join(Constants.COMMA, convertIntListToString(failedList))); @@ -880,7 +869,7 @@ public class ProcessServiceImpl implements ProcessService { TaskExecutionStatus.KILL); for (Integer taskId : stopNodeList) { // initialize the pause state - initTaskInstance(this.findTaskInstanceById(taskId)); + initTaskInstance(taskInstanceDao.findTaskInstanceById(taskId)); } cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING, String.join(Constants.COMMA, convertIntListToString(stopNodeList))); @@ -897,10 +886,11 @@ public class ProcessServiceImpl implements ProcessService { // delete all the valid tasks when complement data if id is not null if (processInstance.getId() != null) { List taskInstanceList = - this.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()); + taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), + processInstance.getTestFlag()); for (TaskInstance taskInstance : taskInstanceList) { taskInstance.setFlag(Flag.NO); - this.updateTaskInstance(taskInstance); + taskInstanceDao.updateTaskInstance(taskInstance); } } break; @@ -912,10 +902,11 @@ public class ProcessServiceImpl implements ProcessService { } // delete all the valid tasks when repeat running List validTaskList = - findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()); + taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), + processInstance.getTestFlag()); for (TaskInstance taskInstance : validTaskList) { taskInstance.setFlag(Flag.NO); - updateTaskInstance(taskInstance); + taskInstanceDao.updateTaskInstance(taskInstance); } processInstance.setStartTime(new Date()); processInstance.setRestartTime(processInstance.getStartTime()); @@ -1067,7 +1058,7 @@ public class ProcessServiceImpl implements ProcessService { // update sub process id to process map table processInstanceMap.setProcessInstanceId(subProcessInstance.getId()); - this.updateWorkProcessInstanceMap(processInstanceMap); + processInstanceMapDao.updateWorkProcessInstanceMap(processInstanceMap); } /** @@ -1129,11 +1120,11 @@ public class ProcessServiceImpl implements ProcessService { if (!taskInstance.isSubProcess() && (taskInstance.getState().isKill() || taskInstance.getState().isFailure())) { taskInstance.setFlag(Flag.NO); - updateTaskInstance(taskInstance); + taskInstanceDao.updateTaskInstance(taskInstance); return; } taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS); - updateTaskInstance(taskInstance); + taskInstanceDao.updateTaskInstance(taskInstance); } /** @@ -1183,7 +1174,7 @@ public class ProcessServiceImpl implements ProcessService { taskInstance.getProcessInstanceId(), processInstance.getState()); // submit to db - TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); + TaskInstance task = taskInstanceDao.submitTaskInstanceToDB(taskInstance, processInstance); if (task == null) { logger.error("Save taskInstance to db error, task name:{}, process id:{} state: {} ", taskInstance.getName(), @@ -1227,7 +1218,7 @@ public class ProcessServiceImpl implements ProcessService { processMap = findPreviousTaskProcessMap(parentInstance, parentTask); if (processMap != null) { processMap.setParentTaskInstanceId(parentTask.getId()); - updateWorkProcessInstanceMap(processMap); + processInstanceMapDao.updateWorkProcessInstanceMap(processMap); return processMap; } } @@ -1235,7 +1226,7 @@ public class ProcessServiceImpl implements ProcessService { processMap = new ProcessInstanceMap(); processMap.setParentProcessInstanceId(parentInstance.getId()); processMap.setParentTaskInstanceId(parentTask.getId()); - createWorkProcessInstanceMap(processMap); + processInstanceMapDao.createWorkProcessInstanceMap(processMap); return processMap; } @@ -1250,11 +1241,13 @@ public class ProcessServiceImpl implements ProcessService { TaskInstance parentTask) { Integer preTaskId = 0; - List preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId()); + List preTaskList = + taskInstanceDao.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId()); for (TaskInstance task : preTaskList) { if (task.getName().equals(parentTask.getName())) { preTaskId = task.getId(); - ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId); + ProcessInstanceMap map = + processInstanceMapDao.findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId); if (map != null) { return map; } @@ -1277,7 +1270,8 @@ public class ProcessServiceImpl implements ProcessService { return; } // check create sub work flow firstly - ProcessInstanceMap instanceMap = findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId()); + ProcessInstanceMap instanceMap = + processInstanceMapDao.findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId()); if (null != instanceMap && CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) { // recover failover tolerance would not create a new command when the sub command already have been created @@ -1347,165 +1341,6 @@ public class ProcessServiceImpl implements ProcessService { } } - /** - * submit task to mysql - * - * @param taskInstance taskInstance - * @param processInstance processInstance - * @return task instance - */ - @Override - public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) { - WorkflowExecutionStatus processInstanceState = processInstance.getState(); - if (processInstanceState.isFinished() || processInstanceState == WorkflowExecutionStatus.READY_STOP) { - logger.warn("processInstance: {} state was: {}, skip submit this task, taskCode: {}", - processInstance.getId(), - processInstanceState, - taskInstance.getTaskCode()); - return null; - } - if (processInstanceState == WorkflowExecutionStatus.READY_PAUSE) { - taskInstance.setState(TaskExecutionStatus.PAUSE); - } - taskInstance.setExecutorId(processInstance.getExecutorId()); - taskInstance.setState(getSubmitTaskState(taskInstance, processInstance)); - if (taskInstance.getSubmitTime() == null) { - taskInstance.setSubmitTime(new Date()); - } - if (taskInstance.getFirstSubmitTime() == null) { - taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime()); - } - boolean saveResult = saveTaskInstance(taskInstance); - if (!saveResult) { - return null; - } - return taskInstance; - } - - /** - * get submit task instance state by the work process state - * cannot modify the task state when running/kill/submit success, or this - * task instance is already exists in task queue . - * return pause if work process state is ready pause - * return stop if work process state is ready stop - * if all of above are not satisfied, return submit success - * - * @param taskInstance taskInstance - * @param processInstance processInstance - * @return process instance state - */ - @Override - public TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) { - TaskExecutionStatus state = taskInstance.getState(); - // running, delayed or killed - // the task already exists in task queue - // return state - if (state == TaskExecutionStatus.RUNNING_EXECUTION - || state == TaskExecutionStatus.DELAY_EXECUTION - || state == TaskExecutionStatus.KILL - || state == TaskExecutionStatus.DISPATCH) { - return state; - } - // return pasue /stop if process instance state is ready pause / stop - // or return submit success - if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) { - state = TaskExecutionStatus.PAUSE; - } else if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP - || !checkProcessStrategy(taskInstance, processInstance)) { - state = TaskExecutionStatus.KILL; - } else { - state = TaskExecutionStatus.SUBMITTED_SUCCESS; - } - return state; - } - - /** - * check process instance strategy - * - * @param taskInstance taskInstance - * @return check strategy result - */ - private boolean checkProcessStrategy(TaskInstance taskInstance, ProcessInstance processInstance) { - FailureStrategy failureStrategy = processInstance.getFailureStrategy(); - if (failureStrategy == FailureStrategy.CONTINUE) { - return true; - } - List taskInstances = - this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), taskInstance.getTestFlag()); - - for (TaskInstance task : taskInstances) { - if (task.getState() == TaskExecutionStatus.FAILURE - && task.getRetryTimes() >= task.getMaxRetryTimes()) { - return false; - } - } - return true; - } - - /** - * insert or update task instance - * - * @param taskInstance taskInstance - * @return save task instance result - */ - @Override - public boolean saveTaskInstance(TaskInstance taskInstance) { - if (taskInstance.getId() != null) { - return updateTaskInstance(taskInstance); - } else { - return createTaskInstance(taskInstance); - } - } - - /** - * insert task instance - * - * @param taskInstance taskInstance - * @return create task instance result - */ - @Override - public boolean createTaskInstance(TaskInstance taskInstance) { - int count = taskInstanceMapper.insert(taskInstance); - return count > 0; - } - - /** - * update task instance - * - * @param taskInstance taskInstance - * @return update task instance result - */ - @Override - public boolean updateTaskInstance(TaskInstance taskInstance) { - int count = taskInstanceMapper.updateById(taskInstance); - return count > 0; - } - - /** - * find task instance by id - * - * @param taskId task id - * @return task instance - */ - @Override - public TaskInstance findTaskInstanceById(Integer taskId) { - return taskInstanceMapper.selectById(taskId); - } - - /** - * find task instance list by id list - * - * @param idList task id list - * @return task instance list - */ - @Override - public List findTaskInstanceByIdList(List idList) { - if (CollectionUtils.isEmpty(idList)) { - return new ArrayList<>(); - } - return taskInstanceMapper.selectBatchIds(idList); - } - /** * package task instance */ @@ -1514,7 +1349,7 @@ public class ProcessServiceImpl implements ProcessService { taskInstance.setProcessInstance(processInstance); taskInstance.setProcessDefine(processInstance.getProcessDefinition()); taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority()); - TaskDefinition taskDefinition = this.findTaskDefinition( + TaskDefinition taskDefinition = taskDefinitionDao.findTaskDefinition( taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); this.updateTaskDefinitionResources(taskDefinition); @@ -1603,69 +1438,6 @@ public class ProcessServiceImpl implements ProcessService { return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.getCode()); } - /** - * find valid task list by process definition id - * - * @param processInstanceId processInstanceId - * @param testFlag testFlag - * @return task instance list - */ - @Override - public List findValidTaskListByProcessId(Integer processInstanceId, int testFlag) { - return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag); - } - - /** - * find previous task list by work process id - * - * @param processInstanceId processInstanceId - * @return task instance list - */ - @Override - public List findPreviousTaskListByWorkProcessId(Integer processInstanceId) { - ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId); - return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO, - processInstance.getTestFlag()); - } - - /** - * update work process instance map - * - * @param processInstanceMap processInstanceMap - * @return update process instance result - */ - @Override - public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) { - return processInstanceMapMapper.updateById(processInstanceMap); - } - - /** - * create work process instance map - * - * @param processInstanceMap processInstanceMap - * @return create process instance result - */ - @Override - public int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) { - int count = 0; - if (processInstanceMap != null) { - return processInstanceMapMapper.insert(processInstanceMap); - } - return count; - } - - /** - * find work process map by parent process id and parent task id. - * - * @param parentWorkProcessId parentWorkProcessId - * @param parentTaskId parentTaskId - * @return process instance map - */ - @Override - public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) { - return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId); - } - /** * delete work process map by parent process id * @@ -1861,18 +1633,6 @@ public class ProcessServiceImpl implements ProcessService { commandService.createCommand(cmd); } - /** - * query all need failover task instances by host - * - * @param host host - * @return task instance list - */ - @Override - public List queryNeedFailoverTaskInstances(String host) { - return taskInstanceMapper.queryByHostAndStatus(host, - TaskExecutionStatus.getNeedFailoverWorkflowInstanceState()); - } - /** * find data source by id * @@ -2252,8 +2012,8 @@ public class ProcessServiceImpl implements ProcessService { if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) { resourceIds = params.getResourceFilesList().stream() - .filter(t -> t.getId() != null) .map(ResourceInfo::getId) + .filter(Objects::nonNull) .collect(toSet()); } if (CollectionUtils.isEmpty(resourceIds)) { @@ -2477,57 +2237,12 @@ public class ProcessServiceImpl implements ProcessService { public DagData genDagData(ProcessDefinition processDefinition) { List taskRelations = this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); - List taskDefinitionLogList = genTaskDefineList(taskRelations); + List taskDefinitionLogList = taskDefinitionLogDao.getTaskDefineLogList(taskRelations); List taskDefinitions = taskDefinitionLogList.stream().map(t -> (TaskDefinition) t).collect(Collectors.toList()); return new DagData(processDefinition, taskRelations, taskDefinitions); } - @Override - public List genTaskDefineList(List processTaskRelations) { - Set taskDefinitionSet = new HashSet<>(); - for (ProcessTaskRelation processTaskRelation : processTaskRelations) { - if (processTaskRelation.getPreTaskCode() > 0) { - taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(), - processTaskRelation.getPreTaskVersion())); - } - if (processTaskRelation.getPostTaskCode() > 0) { - taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), - processTaskRelation.getPostTaskVersion())); - } - } - if (taskDefinitionSet.isEmpty()) { - return Lists.newArrayList(); - } - return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet); - } - - @Override - public List getTaskDefineLogListByRelation(List processTaskRelations) { - List taskDefinitionLogs = new ArrayList<>(); - Map taskCodeVersionMap = new HashMap<>(); - for (ProcessTaskRelation processTaskRelation : processTaskRelations) { - if (processTaskRelation.getPreTaskCode() > 0) { - taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion()); - } - if (processTaskRelation.getPostTaskCode() > 0) { - taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()); - } - } - taskCodeVersionMap.forEach((code, version) -> { - taskDefinitionLogs.add((TaskDefinitionLog) this.findTaskDefinition(code, version)); - }); - return taskDefinitionLogs; - } - - /** - * find task definition by code and version - */ - @Override - public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) { - return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion); - } - /** * find process task relation list by process */ @@ -2571,7 +2286,7 @@ public class ProcessServiceImpl implements ProcessService { }); } if (CollectionUtils.isEmpty(taskDefinitionLogs)) { - taskDefinitionLogs = genTaskDefineList(taskRelationList); + taskDefinitionLogs = taskDefinitionLogDao.getTaskDefineLogList(taskRelationList); } Map taskDefinitionLogMap = taskDefinitionLogs.stream() .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); @@ -2626,7 +2341,7 @@ public class ProcessServiceImpl implements ProcessService { return processTaskMap; } ProcessInstance fatherProcess = this.findProcessInstanceById(processInstanceMap.getParentProcessInstanceId()); - TaskInstance fatherTask = this.findTaskInstanceById(processInstanceMap.getParentTaskInstanceId()); + TaskInstance fatherTask = taskInstanceDao.findTaskInstanceById(processInstanceMap.getParentTaskInstanceId()); if (fatherProcess != null) { processTaskMap.put(fatherProcess, fatherTask); @@ -2936,12 +2651,13 @@ public class ProcessServiceImpl implements ProcessService { if (processInstance != null && (processInstance.getState().isFailure() || processInstance.getState().isStop())) { List validTaskList = - findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()); + taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), + processInstance.getTestFlag()); List instanceTaskCodeList = validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList()); List taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); - List taskDefinitionLogs = genTaskDefineList(taskRelations); + List taskDefinitionLogs = taskDefinitionLogDao.getTaskDefineLogList(taskRelations); List definiteTaskCodeList = taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES) .map(TaskDefinitionLog::getCode).collect(Collectors.toList()); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 96c153ec10..5ad590a24f 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -66,6 +66,9 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType; @@ -121,6 +124,16 @@ public class ProcessServiceTest { private ProcessInstanceMapper processInstanceMapper; @Mock private ProcessInstanceDao processInstanceDao; + + @Mock + private TaskInstanceDao taskInstanceDao; + + @Mock + private TaskDefinitionLogDao taskDefinitionLogDao; + + @Mock + private TaskDefinitionDao taskDefinitionDao; + @Mock private UserMapper userMapper; @Mock @@ -652,7 +665,7 @@ public class ProcessServiceTest { taskDefinitionLogs.add(taskDefinition); taskDefinitionLogs.add(td2); - Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(any())).thenReturn(taskDefinitionLogs); + Mockito.when(taskDefinitionLogDao.getTaskDefineLogList(any())).thenReturn(taskDefinitionLogs); Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt())) .thenReturn(list); @@ -729,23 +742,6 @@ public class ProcessServiceTest { } - @Test - public void testFindTaskInstanceByIdList() { - List emptyList = new ArrayList<>(); - Mockito.when(taskInstanceMapper.selectBatchIds(emptyList)).thenReturn(new ArrayList<>()); - Assertions.assertEquals(0, processService.findTaskInstanceByIdList(emptyList).size()); - - List idList = Collections.singletonList(1); - TaskInstance instance = new TaskInstance(); - instance.setId(1); - - Mockito.when(taskInstanceMapper.selectBatchIds(idList)).thenReturn(Collections.singletonList(instance)); - List taskInstanceByIdList = processService.findTaskInstanceByIdList(idList); - - Assertions.assertEquals(1, taskInstanceByIdList.size()); - Assertions.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId()); - } - @Test public void testFindLastManualProcessInterval() { long definitionCode = 1L;