From 89742332a8c19ee8d52a2dc786b460f4fc96ee62 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 1 Feb 2024 17:13:58 +0800 Subject: [PATCH] Fix TaskGroupQueue will never be wakeup due to wakeup failed at one time (#15528) --- .github/workflows/backend.yml | 2 +- .../api/controller/TaskGroupController.java | 26 +- .../api/service/TaskGroupService.java | 8 - .../api/service/impl/ExecutorServiceImpl.java | 49 +- .../impl/TaskGroupQueueServiceImpl.java | 18 +- .../service/impl/TaskGroupServiceImpl.java | 23 +- .../api/service/TaskGroupServiceTest.java | 9 +- .../dao/entity/TaskGroup.java | 33 +- .../dao/mapper/TaskGroupMapper.java | 32 +- .../dao/mapper/TaskGroupQueueMapper.java | 17 + .../dao/repository/TaskGroupDao.java | 62 +++ .../dao/repository/TaskGroupQueueDao.java | 64 +++ .../dao/repository/impl/TaskGroupDaoImpl.java | 70 +++ .../impl/TaskGroupQueueDaoImpl.java | 73 +++ .../dao/mapper/TaskGroupMapper.xml | 58 +-- .../dao/mapper/TaskGroupQueueMapper.xml | 61 ++- .../resources/sql/dolphinscheduler_h2.sql | 1 + .../resources/sql/dolphinscheduler_mysql.sql | 1 + .../sql/dolphinscheduler_postgresql.sql | 2 + .../mysql/dolphinscheduler_ddl.sql | 17 +- .../postgresql/dolphinscheduler_ddl.sql | 4 +- .../mysql/dolphinscheduler_ddl.sql | 17 +- .../postgresql/dolphinscheduler_ddl.sql | 4 +- .../dao/mapper/TaskGroupMapperTest.java | 9 +- .../repository/impl/TaskGroupDaoImplTest.java | 118 +++++ .../impl/TaskGroupQueueDaoImplTest.java | 108 +++++ .../master/ILogicTaskInstanceOperator.java | 9 - .../master/IWorkflowInstanceService.java | 5 + .../TaskInstanceForceStartRequest.java | 42 -- .../TaskInstanceForceStartResponse.java | 41 -- .../TaskInstanceWakeupRequest.java | 19 +- .../server/master/MasterServer.java | 5 + .../event/StateEventHandlerManager.java | 10 +- .../master/event/TaskStateEventHandler.java | 11 +- .../event/TaskWaitTaskGroupStateHandler.java | 47 -- ...cTaskInstanceOperationFunctionManager.java | 13 - .../rpc/LogicTaskInstanceOperatorImpl.java | 14 - ...skInstanceForceStartOperationFunction.java | 58 --- .../TaskInstanceWakeupOperationFunction.java | 36 +- .../rpc/WorkflowInstanceServiceImpl.java | 10 + .../runner/WorkflowExecuteRunnable.java | 147 ++---- .../WorkflowExecuteRunnableFactory.java | 9 +- .../taskgroup/TaskGroupCoordinator.java | 451 ++++++++++++++++++ .../runner/WorkflowExecuteRunnableTest.java | 11 +- .../taskgroup/TaskGroupCoordinatorTest.java | 182 +++++++ .../registry/api/enums/RegistryNodeType.java | 1 + .../service/process/ProcessService.java | 33 -- .../service/process/ProcessServiceImpl.java | 315 ------------ .../service/process/ProcessServiceTest.java | 16 - 49 files changed, 1453 insertions(+), 918 deletions(-) create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupDao.java create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImpl.java create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java create mode 100644 dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImplTest.java create mode 100644 dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java delete mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartRequest.java delete mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartResponse.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceForceStartOperationFunction.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index f9af440504..ea09a17fd2 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -149,7 +149,7 @@ jobs: fail-fast: false matrix: db: ["mysql", "postgresql"] - version: ["2.0.9", "3.0.6", "3.1.8", "3.2.0"] + version: ["2.0.9", "3.0.6", "3.1.9", "3.2.0"] steps: - name: Set up JDK 8 uses: actions/setup-java@v2 diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java index cb0d15909e..5d8735fbaa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java @@ -298,7 +298,7 @@ public class TaskGroupController extends BaseController { * @param pageSize page size * @return queue list */ - @Operation(summary = "queryTasksByGroupId", description = "QUERY_ALL_TASKS_GROUP_NOTES") + @Operation(summary = "queryTaskGroupQueuesByGroupId", description = "QUERY_TASKS_GROUP_GROUP_QUEUES") @Parameters({ @Parameter(name = "groupId", description = "GROUP_ID", required = false, schema = @Schema(implementation = int.class, example = "1", defaultValue = "-1")), @Parameter(name = "taskInstanceName", description = "TASK_INSTANCE_NAME", required = false, schema = @Schema(implementation = String.class, example = "taskName")), @@ -310,15 +310,21 @@ public class TaskGroupController extends BaseController { @GetMapping(value = "/query-list-by-group-id") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_TASK_GROUP_QUEUE_LIST_ERROR) - public Result queryTasksByGroupId(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId, - @RequestParam(value = "taskInstanceName", required = false) String taskName, - @RequestParam(value = "processInstanceName", required = false) String processName, - @RequestParam(value = "status", required = false) Integer status, - @RequestParam("pageNo") Integer pageNo, - @RequestParam("pageSize") Integer pageSize) { - Map result = taskGroupQueueService.queryTasksByGroupId(loginUser, taskName, processName, status, - groupId, pageNo, pageSize); + public Result queryTaskGroupQueues(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId, + @RequestParam(value = "taskInstanceName", required = false) String taskName, + @RequestParam(value = "processInstanceName", required = false) String processName, + @RequestParam(value = "status", required = false) Integer status, + @RequestParam("pageNo") Integer pageNo, + @RequestParam("pageSize") Integer pageSize) { + Map result = taskGroupQueueService.queryTasksByGroupId( + loginUser, + taskName, + processName, + status, + groupId, + pageNo, + pageSize); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java index db909da158..885e23cf70 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java @@ -50,14 +50,6 @@ public interface TaskGroupService { Map updateTaskGroup(User loginUser, int id, String name, String description, int groupSize); - /** - * get task group status - * - * @param id task group id - * @return the result code and msg - */ - boolean isTheTaskGroupAvailable(int id); - /** * query all task group by user id * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 25eca41158..7ab6102ff8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -55,7 +55,6 @@ import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.enums.TaskDependType; -import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.model.Server; @@ -83,14 +82,12 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; -import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator; import org.apache.dolphinscheduler.extract.master.IStreamingTaskOperator; import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener; import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService; import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto; import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerRequest; import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerResponse; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest; import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.service.command.CommandService; @@ -552,16 +549,20 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ Map result = new HashMap<>(); TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.selectById(queueId); // check process instance exist - ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId()); - if (processInstance == null) { - log.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.", - taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId()); - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId()); - return result; + ProcessInstance processInstance = processInstanceDao.queryOptionalById(taskGroupQueue.getProcessId()) + .orElseThrow( + () -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId())); + checkMasterExists(); + + if (taskGroupQueue.getInQueue() == Flag.NO.getCode()) { + throw new ServiceException(Status.TASK_GROUP_QUEUE_ALREADY_START); } + taskGroupQueue.setForceStart(Flag.YES.getCode()); + taskGroupQueue.setUpdateTime(new Date()); + taskGroupQueueMapper.updateById(taskGroupQueue); - checkMasterExists(); - return forceStart(processInstance, taskGroupQueue); + result.put(Constants.STATUS, Status.SUCCESS); + return result; } public void checkStartNodeList(String startNodeList, Long processDefinitionCode, int version) { @@ -664,32 +665,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } - /** - * prepare to update process instance command type and status - * - * @param processInstance process instance - * @return update result - */ - private Map forceStart(ProcessInstance processInstance, TaskGroupQueue taskGroupQueue) { - Map result = new HashMap<>(); - if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) { - log.warn("Task group queue already starts, taskGroupQueueId:{}.", taskGroupQueue.getId()); - putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START); - return result; - } - - taskGroupQueue.setForceStart(Flag.YES.getCode()); - taskGroupQueue.setUpdateTime(new Date()); - processService.updateTaskGroupQueue(taskGroupQueue); - log.info("Sending force start command to master: {}.", processInstance.getHost()); - ILogicTaskInstanceOperator iLogicTaskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(processInstance.getHost(), ILogicTaskInstanceOperator.class); - iLogicTaskInstanceOperator.forceStartTaskInstance( - new TaskInstanceForceStartRequest(processInstance.getId(), taskGroupQueue.getTaskId())); - putMsg(result, Status.SUCCESS); - return result; - } - /** * check whether sub processes are offline before starting process definition * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java index d33115479f..02c1bea4dc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java @@ -66,8 +66,13 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr * @return tasks list */ @Override - public Map queryTasksByGroupId(User loginUser, String taskName, String processName, Integer status, - int groupId, int pageNo, int pageSize) { + public Map queryTasksByGroupId(User loginUser, + String taskName, + String processName, + Integer status, + int groupId, + int pageNo, + int pageSize) { Map result = new HashMap<>(); Page page = new Page<>(pageNo, pageSize); PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); @@ -79,8 +84,13 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr return result; } List projects = projectMapper.selectBatchIds(projectIds); - IPage taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(page, - taskName, processName, status, groupId, projects); + IPage taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging( + page, + taskName, + processName, + status, + groupId, + projects); pageInfo.setTotal((int) taskGroupQueue.getTotal()); pageInfo.setTotalList(taskGroupQueue.getRecords()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java index 3122d090e2..a1d9b4acf8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java @@ -122,7 +122,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe .description(description) .groupSize(groupSize) .userId(loginUser.getId()) - .status(Flag.YES.getCode()) + .status(Flag.YES) .createTime(now) .updateTime(now) .build(); @@ -180,7 +180,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe putMsg(result, Status.TASK_GROUP_NAME_EXSIT); return result; } - if (taskGroup.getStatus() != Flag.YES.getCode()) { + if (taskGroup.getStatus() != Flag.YES) { log.warn("Task group has been closed, taskGroupId:{}.", id); putMsg(result, Status.TASK_GROUP_STATUS_ERROR); return result; @@ -202,17 +202,6 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe return result; } - /** - * get task group status - * - * @param id task group id - * @return is the task group available - */ - @Override - public boolean isTheTaskGroupAvailable(int id) { - return taskGroupMapper.selectCountByIdStatus(id, Flag.YES.getCode()) == 1; - } - /** * query all task group by user id * @@ -331,12 +320,12 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe return result; } TaskGroup taskGroup = taskGroupMapper.selectById(id); - if (taskGroup.getStatus() == Flag.NO.getCode()) { + if (taskGroup.getStatus() == Flag.NO) { log.info("Task group has been closed, taskGroupId:{}.", id); putMsg(result, Status.TASK_GROUP_STATUS_CLOSED); return result; } - taskGroup.setStatus(Flag.NO.getCode()); + taskGroup.setStatus(Flag.NO); int update = taskGroupMapper.updateById(taskGroup); if (update > 0) log.info("Task group close complete, taskGroupId:{}.", id); @@ -364,12 +353,12 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe return result; } TaskGroup taskGroup = taskGroupMapper.selectById(id); - if (taskGroup.getStatus() == Flag.YES.getCode()) { + if (taskGroup.getStatus() == Flag.YES) { log.info("Task group has been started, taskGroupId:{}.", id); putMsg(result, Status.TASK_GROUP_STATUS_OPENED); return result; } - taskGroup.setStatus(Flag.YES.getCode()); + taskGroup.setStatus(Flag.YES); taskGroup.setUpdateTime(new Date(System.currentTimeMillis())); int update = taskGroupMapper.updateById(taskGroup); if (update > 0) diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java index ae50a54037..22de6c675a 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java @@ -125,7 +125,7 @@ public class TaskGroupServiceTest { .description(taskGroupDesc) .groupSize(100) .userId(1) - .status(Flag.YES.getCode()) + .status(Flag.YES) .build(); return taskGroup; @@ -204,7 +204,7 @@ public class TaskGroupServiceTest { User loginUser = getLoginUser(); TaskGroup taskGroup = getTaskGroup(); - taskGroup.setStatus(Flag.YES.getCode()); + taskGroup.setStatus(Flag.YES); // Task group status error Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.TASK_GROUP, @@ -218,7 +218,6 @@ public class TaskGroupServiceTest { logger.info(result.toString()); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - taskGroup.setStatus(0); } @Test @@ -236,12 +235,12 @@ public class TaskGroupServiceTest { Map result = taskGroupService.closeTaskGroup(loginUser, 1); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - taskGroup.setStatus(0); + taskGroup.setStatus(Flag.NO); Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup); result = taskGroupService.closeTaskGroup(loginUser, 1); Assertions.assertEquals(Status.TASK_GROUP_STATUS_CLOSED, result.get(Constants.STATUS)); - taskGroup.setStatus(1); + taskGroup.setStatus(Flag.YES); Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup); result = taskGroupService.startTaskGroup(loginUser, 1); Assertions.assertEquals(Status.TASK_GROUP_STATUS_OPENED, result.get(Constants.STATUS)); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java index 3aa49f8e17..30e8a095af 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.dao.entity; +import org.apache.dolphinscheduler.common.enums.Flag; + import java.io.Serializable; import java.util.Date; @@ -36,44 +38,17 @@ import com.baomidou.mybatisplus.annotation.TableName; @TableName("t_ds_task_group") public class TaskGroup implements Serializable { - /** - * key - */ @TableId(value = "id", type = IdType.AUTO) private Integer id; - /** - * task_group name - */ private String name; + private long projectCode; private String description; - /** - * 作业组大小 - */ private int groupSize; - /** - * 已使用作业组大小 - */ private int useSize; - /** - * creator id - */ private int userId; - /** - * 0 not available, 1 available - */ - private Integer status; - /** - * create time - */ + private Flag status; private Date createTime; - /** - * update time - */ private Date updateTime; - /** - * project code - */ - private long projectCode; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java index 794bdbab3a..1c98eeff95 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java @@ -35,20 +35,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; */ public interface TaskGroupMapper extends BaseMapper { - int robTaskGroupResource(@Param("id") int id, - @Param("currentUseSize") int currentUseSize, - @Param("queueId") int queueId, - @Param("queueStatus") int queueStatus); - - /** - * update table of task group - * - * @param id primary key - * @return affected rows - */ - int releaseTaskGroupResource(@Param("id") int id, @Param("useSize") int useSize, - @Param("queueId") int queueId, @Param("queueStatus") int queueStatus); - /** * select task groups paging * @@ -57,7 +43,8 @@ public interface TaskGroupMapper extends BaseMapper { * @param status status * @return result page */ - IPage queryTaskGroupPaging(IPage page, @Param("name") String name, + IPage queryTaskGroupPaging(IPage page, + @Param("name") String name, @Param("status") Integer status); /** @@ -69,13 +56,6 @@ public interface TaskGroupMapper extends BaseMapper { */ TaskGroup queryByName(@Param("userId") int userId, @Param("name") String name); - /** - * Select the groupSize > useSize Count - */ - int selectAvailableCountById(@Param("groupId") int groupId); - - int selectCountByIdStatus(@Param("id") int id, @Param("status") int status); - IPage queryTaskGroupPagingByProjectCode(Page page, @Param("projectCode") Long projectCode); /** @@ -87,4 +67,12 @@ public interface TaskGroupMapper extends BaseMapper { List listAuthorizedResource(@Param("userId") int userId); List selectByProjectCode(@Param("projectCode") long projectCode); + + List queryAvailableTaskGroups(); + + List queryUsedTaskGroups(); + + int acquireTaskGroupSlot(@Param("id") Integer id); + + int releaseTaskGroupSlot(@Param("id") Integer id); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java index 437a22efb7..cada1c7092 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java @@ -104,5 +104,22 @@ public interface TaskGroupQueueMapper extends BaseMapper { void deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId); + void deleteByWorkflowInstanceIds(@Param("workflowInstanceIds") List workflowInstanceIds); + void deleteByTaskGroupIds(@Param("taskGroupIds") List taskGroupIds); + + void updateTaskGroupPriorityByTaskInstanceId(@Param("taskInstanceId") Integer taskInstanceId, + @Param("priority") int taskGroupPriority); + + List queryAllInQueueTaskGroupQueueByGroupId(@Param("taskGroupId") Integer taskGroupId, + @Param("inQueue") int inQueue); + + List queryAllTaskGroupQueueByInQueue(@Param("inQueue") int inQueue); + + List queryByTaskInstanceId(@Param("taskInstanceId") Integer taskInstanceId); + + List queryUsingTaskGroupQueueByGroupId(@Param("taskGroupId") Integer taskGroupId, + @Param("status") int status, + @Param("inQueue") int inQueue, + @Param("forceStart") int forceStart); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupDao.java new file mode 100644 index 0000000000..ecdf68a2eb --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupDao.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.TaskGroup; + +import java.util.List; + +public interface TaskGroupDao extends IDao { + + /** + * Query all TaskGroups + * + * @return all TaskGroups + */ + List queryAllTaskGroups(); + + /** + * Query all TaskGroups which useSize > 0 + * + * @return the TaskGroups which useSize > 0 + */ + List queryUsedTaskGroups(); + + /** + * Query all TaskGroups which useSize < groupSize + * + * @return the TaskGroups which useSize < groupSize + */ + List queryAvailableTaskGroups(); + + /** + * Acquire a slot for the TaskGroup which useSize should < groupSize, set the useSize = useSize + 1. + * + * @param taskGroupId taskGroupId which shouldn't be null + * @return true if acquire successfully, false otherwise. + */ + boolean acquireTaskGroupSlot(Integer taskGroupId); + + /** + * Release a slot for the TaskGroup which useSize should > 0, set the useSize = useSize - 1. + * + * @param taskGroupId taskGroupId which shouldn't be null + * @return true if release successfully, false otherwise. + */ + boolean releaseTaskGroupSlot(Integer taskGroupId); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java new file mode 100644 index 0000000000..a788b29bb1 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; + +import java.util.List; + +public interface TaskGroupQueueDao extends IDao { + + /** + * Delete {@link TaskGroupQueue} by {@link ProcessInstance#getId()} + * + * @param workflowInstanceIds workflowInstanceIds + */ + void deleteByWorkflowInstanceIds(List workflowInstanceIds); + + /** + * Query all {@link TaskGroupQueue} which in_queue is {@link org.apache.dolphinscheduler.common.enums.Flag#YES} + * + * @return TaskGroupQueue ordered by priority desc + */ + List queryAllInQueueTaskGroupQueue(); + + /** + * Query all {@link TaskGroupQueue} which in_queue is {@link org.apache.dolphinscheduler.common.enums.Flag#YES} and taskGroupId is taskGroupId + * + * @param taskGroupId taskGroupId + * @return TaskGroupQueue ordered by priority desc + */ + List queryAllInQueueTaskGroupQueueByGroupId(Integer taskGroupId); + + /** + * Query all {@link TaskGroupQueue} which taskId is taskInstanceId + * + * @param taskInstanceId taskInstanceId + * @return TaskGroupQueue ordered by priority desc + */ + List queryByTaskInstanceId(Integer taskInstanceId); + + /** + * Query all {@link TaskGroupQueue} which status is TaskGroupQueueStatus.ACQUIRE_SUCCESS and forceStart is {@link org.apache.dolphinscheduler.common.enums.Flag#NO}. + * + * @param taskGroupId taskGroupId + * @return TaskGroupQueue + */ + List queryAcquiredTaskGroupQueueByGroupId(Integer taskGroupId); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImpl.java new file mode 100644 index 0000000000..80d2c473d7 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImpl.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.entity.TaskGroup; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; +import org.apache.dolphinscheduler.dao.repository.TaskGroupDao; + +import java.util.List; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Repository; + +@Slf4j +@Repository +public class TaskGroupDaoImpl extends BaseDao implements TaskGroupDao { + + public TaskGroupDaoImpl(@NonNull TaskGroupMapper taskGroupMapper) { + super(taskGroupMapper); + } + + @Override + public List queryAllTaskGroups() { + return mybatisMapper.selectList(null); + } + + @Override + public List queryUsedTaskGroups() { + return mybatisMapper.queryUsedTaskGroups(); + } + + @Override + public List queryAvailableTaskGroups() { + return mybatisMapper.queryAvailableTaskGroups(); + } + + @Override + public boolean acquireTaskGroupSlot(Integer taskGroupId) { + if (taskGroupId == null) { + throw new IllegalArgumentException("taskGroupId cannot be null"); + } + return mybatisMapper.acquireTaskGroupSlot(taskGroupId) > 0; + } + + @Override + public boolean releaseTaskGroupSlot(Integer taskGroupId) { + if (taskGroupId == null) { + throw new IllegalArgumentException("taskGroupId cannot be null"); + } + return mybatisMapper.releaseTaskGroupSlot(taskGroupId) > 0; + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java new file mode 100644 index 0000000000..a1808a9091 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; +import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.List; + +import lombok.NonNull; + +import org.springframework.stereotype.Repository; + +@Repository +public class TaskGroupQueueDaoImpl extends BaseDao implements TaskGroupQueueDao { + + public TaskGroupQueueDaoImpl(@NonNull TaskGroupQueueMapper taskGroupQueueMapper) { + super(taskGroupQueueMapper); + } + + @Override + public void deleteByWorkflowInstanceIds(List workflowInstanceIds) { + if (CollectionUtils.isEmpty(workflowInstanceIds)) { + return; + } + mybatisMapper.deleteByWorkflowInstanceIds(workflowInstanceIds); + } + + @Override + public List queryAllInQueueTaskGroupQueue() { + return mybatisMapper.queryAllTaskGroupQueueByInQueue(Flag.YES.getCode()); + } + + @Override + public List queryAllInQueueTaskGroupQueueByGroupId(Integer taskGroupId) { + return mybatisMapper.queryAllInQueueTaskGroupQueueByGroupId(taskGroupId, Flag.YES.getCode()); + } + + @Override + public List queryByTaskInstanceId(Integer taskInstanceId) { + return mybatisMapper.queryByTaskInstanceId(taskInstanceId); + } + + @Override + public List queryAcquiredTaskGroupQueueByGroupId(Integer taskGroupId) { + return mybatisMapper.queryUsingTaskGroupQueueByGroupId( + taskGroupId, + TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode(), + Flag.YES.getCode(), + Flag.NO.getCode()); + } +} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml index c3f1df0b0e..eb14e03dd4 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml @@ -67,24 +67,6 @@ where project_code = #{projectCode} - - - update t_ds_task_group - set use_size = use_size + 1 - where id = #{id} - and use_size < group_size - and use_size = #{currentUseSize} - and (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus}) = 1 - - - - - update t_ds_task_group - set use_size = use_size-1 - where id = #{id} and use_size > 0 and - (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus} ) = 1 - - - select - count(1) + from t_ds_task_group - where - id = #{groupId} and use_size < group_size + where 1=1 + + and user_id = #{userId} + - select - count(1) + from t_ds_task_group - where - id = #{id} and status = #{status} + where use_size group_size - select - + from t_ds_task_group - where 1=1 - - and user_id = #{userId} - + where use_size ]]> 0 + + update t_ds_task_group + set use_size = use_size + 1 + where id = #{id} + and use_size < group_size + + + + update t_ds_task_group + set use_size = use_size - 1 + where id = #{id} + and use_size > 0 + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml index a5bd80fbad..790ad7bfae 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml @@ -46,7 +46,7 @@ and group_id = #{groupId} - order by update_time desc + order by update_time desc, id desc select - queue.id, queue.task_name, queue.group_id, queue.process_id, queue.priority, queue.status - , queue.force_start, queue.create_time, queue.update_time, - process.name as processInstanceName,p.name as projectName,p.code as projectCode + queue.id, + queue.task_name, + queue.group_id, + queue.process_id, + queue.priority, + queue.in_queue, + queue.status, + queue.force_start, + queue.create_time, + queue.update_time, + process.name as processInstanceName, + p.name as projectName, + p.code as projectCode from t_ds_task_group_queue queue left join t_ds_process_instance process on queue.process_id = process.id left join t_ds_process_definition p_f on process.process_definition_code = p_f.code @@ -171,6 +181,15 @@ where process_id = #{workflowInstanceId} + + delete + from t_ds_task_group_queue + where process_id in + + #{i} + + + delete from t_ds_task_group_queue @@ -180,4 +199,38 @@ + + update t_ds_task_group_queue + set priority = #{priority} + where task_id = #{taskInstanceId} + + + + + + + + + + diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index c22f12c09a..656e34af0e 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -1976,6 +1976,7 @@ CREATE TABLE t_ds_task_group_queue in_queue int(4) DEFAULT '0' , create_time datetime DEFAULT NULL , update_time datetime DEFAULT NULL , + KEY idx_t_ds_task_group_queue_in_queue (in_queue) , PRIMARY KEY (id) ); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 347acb0a64..0d3e8b190a 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -1964,6 +1964,7 @@ CREATE TABLE `t_ds_task_group_queue` ( `in_queue` tinyint(4) DEFAULT '0' COMMENT 'ready to get the queue by other task finish 0 NO ,1 YES', `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + KEY `idx_t_ds_task_group_queue_in_queue` (`in_queue`), PRIMARY KEY( `id` ) )ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8 COLLATE = utf8_bin; diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 66bcb17f38..b54f19a2d1 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -1946,6 +1946,8 @@ CREATE TABLE t_ds_task_group_queue ( PRIMARY KEY (id) ); +create index idx_t_ds_task_group_queue_in_queue on t_ds_task_group_queue(in_queue); + -- -- Table structure for table t_ds_task_group -- diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql index 957f072a43..f4d7c510c1 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql @@ -109,4 +109,19 @@ ALTER TABLE `t_ds_process_definition_log` MODIFY COLUMN `version` int NOT NULL D ALTER TABLE `t_ds_process_instance` MODIFY COLUMN `process_definition_version` int NOT NULL DEFAULT 1 COMMENT "process definition version"; ALTER TABLE `t_ds_task_definition` MODIFY COLUMN `version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; ALTER TABLE `t_ds_task_definition_log` MODIFY COLUMN `version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; -ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_definition_version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; \ No newline at end of file +ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_definition_version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; + +-- create idx_t_ds_task_group_queue_in_queue on t_ds_task_group_queue +DROP PROCEDURE IF EXISTS create_idx_t_ds_task_group_queue_in_queue; +delimiter d// +CREATE PROCEDURE create_idx_t_ds_task_group_queue_in_queue() +BEGIN + DECLARE index_exists INT DEFAULT 0; + SELECT COUNT(*) INTO index_exists FROM information_schema.statistics WHERE table_schema = (SELECT DATABASE()) AND table_name = 't_ds_task_group_queue' AND index_name = 'idx_t_ds_task_group_queue_in_queue'; + IF index_exists = 0 THEN CREATE INDEX idx_t_ds_task_group_queue_in_queue ON t_ds_task_group_queue(in_queue); +END IF; +END; +d// +delimiter ; +CALL create_idx_t_ds_task_group_queue_in_queue; +DROP PROCEDURE create_idx_t_ds_task_group_queue_in_queue; \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql index ce48a3374e..157ad548d3 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql @@ -29,4 +29,6 @@ ALTER TABLE "t_ds_process_definition_log" ALTER COLUMN "version" SET DEFAULT 1; ALTER TABLE "t_ds_task_definition" ALTER COLUMN "version" SET DEFAULT 1; ALTER TABLE "t_ds_task_definition_log" ALTER COLUMN "version" SET DEFAULT 1; ALTER TABLE "t_ds_process_instance" ALTER COLUMN "process_definition_version" SET NOT NULL, ALTER COLUMN "process_definition_version" SET DEFAULT 1; -ALTER TABLE "t_ds_task_instance" ALTER COLUMN "task_definition_version" SET NOT NULL, ALTER COLUMN "task_definition_version" SET DEFAULT 1; \ No newline at end of file +ALTER TABLE "t_ds_task_instance" ALTER COLUMN "task_definition_version" SET NOT NULL, ALTER COLUMN "task_definition_version" SET DEFAULT 1; + +CREATE INDEX IF NOT EXISTS idx_t_ds_task_group_queue_in_queue ON t_ds_task_group_queue(in_queue); \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql index 6c73e112da..657414b4a0 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql @@ -80,4 +80,19 @@ ALTER TABLE `t_ds_process_definition_log` MODIFY COLUMN `version` int NOT NULL D ALTER TABLE `t_ds_process_instance` MODIFY COLUMN `process_definition_version` int NOT NULL DEFAULT 1 COMMENT "process definition version"; ALTER TABLE `t_ds_task_definition` MODIFY COLUMN `version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; ALTER TABLE `t_ds_task_definition_log` MODIFY COLUMN `version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; -ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_definition_version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; \ No newline at end of file +ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_definition_version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; + +-- create idx_t_ds_task_group_queue_in_queue on t_ds_task_group_queue +DROP PROCEDURE IF EXISTS create_idx_t_ds_task_group_queue_in_queue; +delimiter d// +CREATE PROCEDURE create_idx_t_ds_task_group_queue_in_queue() +BEGIN + DECLARE index_exists INT DEFAULT 0; + SELECT COUNT(*) INTO index_exists FROM information_schema.statistics WHERE table_schema = (SELECT DATABASE()) AND table_name = 't_ds_task_group_queue' AND index_name = 'idx_t_ds_task_group_queue_in_queue'; + IF index_exists = 0 THEN CREATE INDEX idx_t_ds_task_group_queue_in_queue ON t_ds_task_group_queue(in_queue); +END IF; +END; +d// +delimiter ; +CALL create_idx_t_ds_task_group_queue_in_queue; +DROP PROCEDURE create_idx_t_ds_task_group_queue_in_queue; \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql index fd13ed3594..e984774010 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql @@ -68,4 +68,6 @@ ALTER TABLE "t_ds_process_definition_log" ALTER COLUMN "version" SET DEFAULT 1; ALTER TABLE "t_ds_task_definition" ALTER COLUMN "version" SET DEFAULT 1; ALTER TABLE "t_ds_task_definition_log" ALTER COLUMN "version" SET DEFAULT 1; ALTER TABLE "t_ds_process_instance" ALTER COLUMN "process_definition_version" SET NOT NULL, ALTER COLUMN "process_definition_version" SET DEFAULT 1; -ALTER TABLE "t_ds_task_instance" ALTER COLUMN "task_definition_version" SET NOT NULL, ALTER COLUMN "task_definition_version" SET DEFAULT 1; \ No newline at end of file +ALTER TABLE "t_ds_task_instance" ALTER COLUMN "task_definition_version" SET NOT NULL, ALTER COLUMN "task_definition_version" SET DEFAULT 1; + +CREATE INDEX IF NOT EXISTS idx_t_ds_task_group_queue_in_queue ON t_ds_task_group_queue(in_queue); \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java index 71c01bd95e..b457406baf 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.dao.mapper; +import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.dao.BaseDaoTest; import org.apache.dolphinscheduler.dao.entity.TaskGroup; @@ -24,8 +25,6 @@ import java.util.Date; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import com.baomidou.mybatisplus.core.metadata.IPage; @@ -33,8 +32,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; public class TaskGroupMapperTest extends BaseDaoTest { - private static final Logger logger = LoggerFactory.getLogger(TaskGroupMapperTest.class); - @Autowired TaskGroupMapper taskGroupMapper; @@ -46,7 +43,7 @@ public class TaskGroupMapperTest extends BaseDaoTest { taskGroup.setName("task group"); taskGroup.setId(1); taskGroup.setUserId(1); - taskGroup.setStatus(1); + taskGroup.setStatus(Flag.YES); taskGroup.setGroupSize(10); taskGroup.setDescription("this is a task group"); Date date = new Date(System.currentTimeMillis()); @@ -88,7 +85,7 @@ public class TaskGroupMapperTest extends BaseDaoTest { Page page = new Page(1, 3); IPage taskGroupIPage = taskGroupMapper.queryTaskGroupPaging( page, - taskGroup.getName(), taskGroup.getStatus()); + taskGroup.getName(), taskGroup.getStatus().getCode()); Assertions.assertEquals(taskGroupIPage.getTotal(), 1); } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImplTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImplTest.java new file mode 100644 index 0000000000..1f0cdf24e0 --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImplTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.dao.BaseDaoTest; +import org.apache.dolphinscheduler.dao.entity.TaskGroup; +import org.apache.dolphinscheduler.dao.repository.TaskGroupDao; + +import java.util.Date; +import java.util.List; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +class TaskGroupDaoImplTest extends BaseDaoTest { + + @Autowired + private TaskGroupDao taskGroupDao; + + @Test + void queryAllTaskGroups() { + TaskGroup taskGroup = createTaskGroup("test", 0, 1); + taskGroupDao.insert(taskGroup); + List taskGroups = taskGroupDao.queryAllTaskGroups(); + assertEquals(1, taskGroups.size()); + } + + @Test + void queryUsedTaskGroups() { + // Insert a unused task group + TaskGroup taskGroup = createTaskGroup("testUnused", 0, 1); + taskGroupDao.insert(taskGroup); + assertEquals(0, taskGroupDao.queryUsedTaskGroups().size()); + + // Insert a used task group + taskGroup = createTaskGroup("testUsed", 1, 1); + taskGroupDao.insert(taskGroup); + assertEquals(1, taskGroupDao.queryUsedTaskGroups().size()); + } + + @Test + void queryAvailableTaskGroups() { + // Insert a full task group + TaskGroup taskGroup = createTaskGroup("testFull", 1, 1); + taskGroupDao.insert(taskGroup); + assertEquals(0, taskGroupDao.queryAvailableTaskGroups().size()); + + // Insert a used task group + taskGroup = createTaskGroup("testNotFull", 0, 1); + taskGroupDao.insert(taskGroup); + assertEquals(1, taskGroupDao.queryAvailableTaskGroups().size()); + } + + @Test + void acquireTaskGroupSlot() { + // Insert a full task group will acquire failed + TaskGroup taskGroup = createTaskGroup("testFull", 1, 1); + taskGroupDao.insert(taskGroup); + assertFalse(taskGroupDao.acquireTaskGroupSlot(taskGroup.getId())); + + taskGroup.setUseSize(0); + taskGroupDao.updateById(taskGroup); + assertTrue(taskGroupDao.acquireTaskGroupSlot(taskGroup.getId())); + + taskGroup = taskGroupDao.queryById(taskGroup.getId()); + assertEquals(1, taskGroup.getUseSize()); + } + + @Test + void releaseTaskGroupSlot() { + // Insert an empty task group will release failed + TaskGroup taskGroup = createTaskGroup("testEmpty", 0, 1); + taskGroupDao.insert(taskGroup); + assertFalse(taskGroupDao.releaseTaskGroupSlot(taskGroup.getId())); + + taskGroup.setUseSize(1); + taskGroupDao.updateById(taskGroup); + assertTrue(taskGroupDao.releaseTaskGroupSlot(taskGroup.getId())); + + taskGroup = taskGroupDao.queryById(taskGroup.getId()); + assertEquals(0, taskGroup.getUseSize()); + } + + private TaskGroup createTaskGroup(String name, int useSize, int groupSize) { + return TaskGroup.builder() + .name(name) + .description("test") + .groupSize(groupSize) + .useSize(useSize) + .userId(1) + .status(Flag.YES) + .createTime(new Date()) + .updateTime(new Date()) + .projectCode(1) + .build(); + } + +} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java new file mode 100644 index 0000000000..17c1537184 --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; +import org.apache.dolphinscheduler.dao.BaseDaoTest; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; +import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao; + +import java.util.Date; + +import org.assertj.core.util.Lists; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +class TaskGroupQueueDaoImplTest extends BaseDaoTest { + + @Autowired + private TaskGroupQueueDao taskGroupQueueDao; + + @Test + void deleteByWorkflowInstanceIds() { + TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS); + taskGroupQueueDao.insert(taskGroupQueue); + assertNotNull(taskGroupQueueDao.queryById(taskGroupQueue.getId())); + + taskGroupQueueDao.deleteByWorkflowInstanceIds(Lists.newArrayList(1)); + assertNull(taskGroupQueueDao.queryById(taskGroupQueue.getId())); + } + + @Test + void queryAllInQueueTaskGroupQueue() { + TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS); + taskGroupQueueDao.insert(taskGroupQueue); + assertEquals(1, taskGroupQueueDao.queryAllInQueueTaskGroupQueue().size()); + } + + @Test + void queryAllInQueueTaskGroupQueueByGroupId() { + TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS); + taskGroupQueueDao.insert(taskGroupQueue); + assertEquals(1, taskGroupQueueDao.queryAllInQueueTaskGroupQueueByGroupId(1).size()); + } + + @Test + void updateById() { + TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.WAIT_QUEUE); + taskGroupQueueDao.insert(taskGroupQueue); + + taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); + taskGroupQueueDao.updateById(taskGroupQueue); + assertEquals(TaskGroupQueueStatus.ACQUIRE_SUCCESS, + taskGroupQueueDao.queryById(taskGroupQueue.getId()).getStatus()); + } + + @Test + void queryByTaskInstanceId() { + TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS); + taskGroupQueueDao.insert(taskGroupQueue); + assertEquals(1, taskGroupQueueDao.queryByTaskInstanceId(1).size()); + } + + @Test + void queryUsingTaskGroupQueueByGroupId() { + TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS); + taskGroupQueueDao.insert(taskGroupQueue); + assertEquals(1, taskGroupQueueDao.queryAcquiredTaskGroupQueueByGroupId(1).size()); + + taskGroupQueue = createTaskGroupQueue(Flag.YES, TaskGroupQueueStatus.WAIT_QUEUE); + taskGroupQueueDao.insert(taskGroupQueue); + assertEquals(1, taskGroupQueueDao.queryAcquiredTaskGroupQueueByGroupId(1).size()); + } + + private TaskGroupQueue createTaskGroupQueue(Flag forceStart, TaskGroupQueueStatus taskGroupQueueStatus) { + return TaskGroupQueue.builder() + .taskId(1) + .taskName("test") + .groupId(1) + .processId(1) + .priority(0) + .forceStart(forceStart.getCode()) + .inQueue(Flag.YES.getCode()) + .status(taskGroupQueueStatus) + .createTime(new Date()) + .updateTime(new Date()) + .build(); + } +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java index a4fb547028..f85200b7a2 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java @@ -25,9 +25,6 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillReque import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartResponse; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; @RpcService public interface ILogicTaskInstanceOperator { @@ -41,10 +38,4 @@ public interface ILogicTaskInstanceOperator { @RpcMethod LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequest); - @RpcMethod - TaskInstanceForceStartResponse forceStartTaskInstance(TaskInstanceForceStartRequest taskForceStartRequest); - - @RpcMethod - void wakeupTaskInstance(TaskInstanceWakeupRequest taskWakeupRequest); - } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowInstanceService.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowInstanceService.java index 217e81322e..a535f8c6ca 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowInstanceService.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowInstanceService.java @@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.extract.master; import org.apache.dolphinscheduler.extract.base.RpcMethod; import org.apache.dolphinscheduler.extract.base.RpcService; import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto; +import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; +import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupResponse; @RpcService public interface IWorkflowInstanceService { @@ -30,4 +32,7 @@ public interface IWorkflowInstanceService { @RpcMethod WorkflowExecuteDto getWorkflowExecutingData(Integer workflowInstanceId); + @RpcMethod + TaskInstanceWakeupResponse wakeupTaskInstance(TaskInstanceWakeupRequest taskWakeupRequest); + } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartRequest.java deleted file mode 100644 index 0a4711fa1e..0000000000 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartRequest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.extract.master.transportor; - -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -public class TaskInstanceForceStartRequest { - - private String key; - - private int processInstanceId; - - private int taskInstanceId; - - public TaskInstanceForceStartRequest( - int processInstanceId, - int taskInstanceId) { - this.key = String.format("%d-%d", processInstanceId, taskInstanceId); - - this.processInstanceId = processInstanceId; - this.taskInstanceId = taskInstanceId; - } - -} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartResponse.java deleted file mode 100644 index db19c023dc..0000000000 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartResponse.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.extract.master.transportor; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -@AllArgsConstructor -public class TaskInstanceForceStartResponse { - - private boolean success; - - private String message; - - public static TaskInstanceForceStartResponse success() { - return new TaskInstanceForceStartResponse(true, "dispatch success"); - } - - public static TaskInstanceForceStartResponse failed(String message) { - return new TaskInstanceForceStartResponse(false, message); - } - -} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceWakeupRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceWakeupRequest.java index 5bfbc03f1c..ddca440436 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceWakeupRequest.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceWakeupRequest.java @@ -17,26 +17,21 @@ package org.apache.dolphinscheduler.extract.master.transportor; +import java.io.Serializable; + +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @Data +@Builder @NoArgsConstructor -public class TaskInstanceWakeupRequest { - - private String key; +@AllArgsConstructor +public class TaskInstanceWakeupRequest implements Serializable { private int processInstanceId; private int taskInstanceId; - public TaskInstanceWakeupRequest( - int processInstanceId, - int taskInstanceId) { - this.key = String.format("%d-%d", processInstanceId, taskInstanceId); - - this.processInstanceId = processInstanceId; - this.taskInstanceId = taskInstanceId; - } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 7d6600cc43..d37ca9d016 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap; +import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import javax.annotation.PostConstruct; @@ -84,6 +85,9 @@ public class MasterServer implements IStoppable { @Autowired private MasterSlotManager masterSlotManager; + @Autowired + private TaskGroupCoordinator taskGroupCoordinator; + public static void main(String[] args) { MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount); @@ -115,6 +119,7 @@ public class MasterServer implements IStoppable { this.failoverExecuteThread.start(); this.schedulerApi.start(); + this.taskGroupCoordinator.start(); MasterServerMetrics.registerMasterCpuUsageGauge(() -> { SystemMetrics systemMetrics = metricsProvider.getSystemMetrics(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java index b91da8cc51..7cd89b91fa 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java @@ -24,14 +24,20 @@ import java.util.Map; import java.util.Optional; import java.util.ServiceLoader; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class StateEventHandlerManager { private static final Map stateEventHandlerMap = new HashMap<>(); static { ServiceLoader.load(StateEventHandler.class) - .forEach(stateEventHandler -> stateEventHandlerMap.put(stateEventHandler.getEventType(), - stateEventHandler)); + .forEach(stateEventHandler -> { + log.info("Initialize StateEventHandler: {} for eventType: {}", + stateEventHandler.getClass().getName(), stateEventHandler.getEventType()); + stateEventHandlerMap.put(stateEventHandler.getEventType(), stateEventHandler); + }); } public static Optional getStateEventHandler(StateEventType stateEventType) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java index f790131a70..2dde1315de 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java @@ -62,7 +62,8 @@ public class TaskStateEventHandler implements StateEventHandler { if (task.getState().isFinished() && (taskStateEvent.getStatus() != null && taskStateEvent.getStatus().isRunning())) { String errorMessage = String.format( - "The current task instance state is %s, but the task state event status is %s, so the task state event will be ignored", + "The current TaskInstance: %s state is %s, but the task state event status is %s, so the task state event will be ignored", + task.getName(), task.getState().name(), taskStateEvent.getStatus().name()); log.warn(errorMessage); @@ -75,14 +76,6 @@ public class TaskStateEventHandler implements StateEventHandler { return true; } workflowExecuteRunnable.taskFinished(task); - if (task.getTaskGroupId() > 0) { - log.info("The task instance need to release task Group: {}", task.getTaskGroupId()); - try { - workflowExecuteRunnable.releaseTaskGroup(task); - } catch (Exception e) { - throw new StateEventHandleException("Release task group failed", e); - } - } return true; } return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java deleted file mode 100644 index 4859e896b9..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.event; - -import org.apache.dolphinscheduler.common.enums.StateEventType; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; - -import lombok.extern.slf4j.Slf4j; - -import com.google.auto.service.AutoService; - -@AutoService(StateEventHandler.class) -@Slf4j -public class TaskWaitTaskGroupStateHandler implements StateEventHandler { - - @Override - public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, - StateEvent stateEvent) { - log.info("Handle task instance wait task group event, taskInstanceId: {}", stateEvent.getTaskInstanceId()); - if (workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent)) { - log.info("Success wake up task instance, taskInstanceId: {}", stateEvent.getTaskInstanceId()); - } else { - log.info("Failed to wake up task instance, taskInstanceId: {}", stateEvent.getTaskInstanceId()); - } - return true; - } - - @Override - public StateEventType getEventType() { - return StateEventType.WAKE_UP_TASK_GROUP; - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java index 3a07d68364..de4095831e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java @@ -32,12 +32,6 @@ public class LogicTaskInstanceOperationFunctionManager { @Autowired private LogicITaskInstancePauseOperationFunction logicITaskInstancePauseOperationFunction; - @Autowired - private TaskInstanceForceStartOperationFunction taskInstanceForceStartOperationFunction; - - @Autowired - private TaskInstanceWakeupOperationFunction taskInstanceWakeupOperationFunction; - public LogicITaskInstanceDispatchOperationFunction getLogicTaskInstanceDispatchOperationFunction() { return logicITaskInstanceDispatchOperationFunction; } @@ -50,11 +44,4 @@ public class LogicTaskInstanceOperationFunctionManager { return logicITaskInstancePauseOperationFunction; } - public TaskInstanceForceStartOperationFunction getTaskInstanceForceStartOperationFunction() { - return taskInstanceForceStartOperationFunction; - } - - public TaskInstanceWakeupOperationFunction getTaskInstanceWakeupOperationFunction() { - return taskInstanceWakeupOperationFunction; - } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java index c99968d1e9..39446bd89d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java @@ -24,9 +24,6 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillReque import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartResponse; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; import lombok.extern.slf4j.Slf4j; @@ -58,15 +55,4 @@ public class LogicTaskInstanceOperatorImpl implements ILogicTaskInstanceOperator .operate(taskPauseRequest); } - @Override - public TaskInstanceForceStartResponse forceStartTaskInstance(TaskInstanceForceStartRequest taskForceStartRequest) { - return logicTaskInstanceOperationFunctionManager.getTaskInstanceForceStartOperationFunction() - .operate(taskForceStartRequest); - } - - @Override - public void wakeupTaskInstance(TaskInstanceWakeupRequest taskWakeupRequest) { - logicTaskInstanceOperationFunctionManager.getTaskInstanceWakeupOperationFunction().operate(taskWakeupRequest); - } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceForceStartOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceForceStartOperationFunction.java deleted file mode 100644 index a15766e4c4..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceForceStartOperationFunction.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.rpc; - -import org.apache.dolphinscheduler.common.enums.StateEventType; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartResponse; -import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; -import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; -import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService; - -import lombok.extern.slf4j.Slf4j; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -public class TaskInstanceForceStartOperationFunction - implements - ITaskInstanceOperationFunction { - - @Autowired - private StateEventResponseService stateEventResponseService; - - @Override - public TaskInstanceForceStartResponse operate(TaskInstanceForceStartRequest taskInstanceForceStartRequest) { - TaskStateEvent stateEvent = TaskStateEvent.builder() - .processInstanceId(taskInstanceForceStartRequest.getProcessInstanceId()) - .taskInstanceId(taskInstanceForceStartRequest.getTaskInstanceId()) - .key(taskInstanceForceStartRequest.getKey()) - .type(StateEventType.WAKE_UP_TASK_GROUP) - .build(); - try { - LogUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId()); - log.info("Received forceStartTaskInstance, event: {}", stateEvent); - stateEventResponseService.addEvent2WorkflowExecute(stateEvent); - return TaskInstanceForceStartResponse.success(); - } finally { - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); - } - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceWakeupOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceWakeupOperationFunction.java index 9a5b716e90..4a83874fb9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceWakeupOperationFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceWakeupOperationFunction.java @@ -17,12 +17,12 @@ package org.apache.dolphinscheduler.server.master.rpc; -import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupResponse; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; -import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; -import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import lombok.extern.slf4j.Slf4j; @@ -36,20 +36,30 @@ public class TaskInstanceWakeupOperationFunction ITaskInstanceOperationFunction { @Autowired - private StateEventResponseService stateEventResponseService; + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; @Override public TaskInstanceWakeupResponse operate(TaskInstanceWakeupRequest taskInstanceWakeupRequest) { - TaskStateEvent stateEvent = TaskStateEvent.builder() - .processInstanceId(taskInstanceWakeupRequest.getProcessInstanceId()) - .taskInstanceId(taskInstanceWakeupRequest.getTaskInstanceId()) - .key(taskInstanceWakeupRequest.getKey()) - .type(StateEventType.WAKE_UP_TASK_GROUP) - .build(); try { - LogUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId()); - log.info("Received wakeupTaskInstance request, event: {}", stateEvent); - stateEventResponseService.addEvent2WorkflowExecute(stateEvent); + log.info("Received TaskInstanceWakeupRequest request{}", taskInstanceWakeupRequest); + + int workflowInstanceId = taskInstanceWakeupRequest.getProcessInstanceId(); + int taskInstanceId = taskInstanceWakeupRequest.getTaskInstanceId(); + LogUtils.setWorkflowAndTaskInstanceIDMDC(workflowInstanceId, taskInstanceId); + WorkflowExecuteRunnable workflowExecuteRunnable = + processInstanceExecCacheManager.getByProcessInstanceId(workflowInstanceId); + if (workflowExecuteRunnable == null) { + log.warn("cannot find WorkflowExecuteRunnable: {}, no need to Wakeup task", workflowInstanceId); + return TaskInstanceWakeupResponse.failed("cannot find WorkflowExecuteRunnable: " + workflowInstanceId); + } + DefaultTaskExecuteRunnable defaultTaskExecuteRunnable = + workflowExecuteRunnable.getTaskExecuteRunnableById(taskInstanceId).orElse(null); + if (defaultTaskExecuteRunnable == null) { + log.warn("Cannot find DefaultTaskExecuteRunnable: {}, cannot Wakeup task", taskInstanceId); + return TaskInstanceWakeupResponse.failed("Cannot find DefaultTaskExecuteRunnable: " + taskInstanceId); + } + defaultTaskExecuteRunnable.dispatch(); + log.info("Success Wakeup TaskInstance: {}", taskInstanceId); return TaskInstanceWakeupResponse.success(); } finally { LogUtils.removeWorkflowAndTaskInstanceIdMDC(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowInstanceServiceImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowInstanceServiceImpl.java index c3420cb1da..d10c8b81c4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowInstanceServiceImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowInstanceServiceImpl.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.rpc; import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService; import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto; +import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; +import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupResponse; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.service.ExecutingService; @@ -36,6 +38,9 @@ public class WorkflowInstanceServiceImpl implements IWorkflowInstanceService { @Autowired private ExecutingService executingService; + @Autowired + private TaskInstanceWakeupOperationFunction taskInstanceWakeupOperationFunction; + @Override public void clearWorkflowMetrics(Long workflowDefinitionCode) { log.info("Receive clearWorkflowMetrics request: {}", workflowDefinitionCode); @@ -49,4 +54,9 @@ public class WorkflowInstanceServiceImpl implements IWorkflowInstanceService { executingService.queryWorkflowExecutingData(workflowInstanceId); return workflowExecuteDtoOptional.orElse(null); } + + @Override + public TaskInstanceWakeupResponse wakeupTaskInstance(TaskInstanceWakeupRequest taskWakeupRequest) { + return taskInstanceWakeupOperationFunction.operate(taskWakeupRequest); + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 3759f2209c..2db49c6f09 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.StateEventType; -import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils; @@ -52,14 +51,11 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; -import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest; import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse; @@ -81,6 +77,7 @@ import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent; import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; +import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator; import org.apache.dolphinscheduler.server.master.utils.TaskUtils; import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils; import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager; @@ -114,7 +111,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import lombok.NonNull; @@ -227,6 +223,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { private final ListenerEventAlertManager listenerEventAlertManager; + private final TaskGroupCoordinator taskGroupCoordinator; + public WorkflowExecuteRunnable( @NonNull IWorkflowExecuteContext workflowExecuteContext, @NonNull CommandService commandService, @@ -238,7 +236,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { @NonNull CuringParamsService curingParamsService, @NonNull TaskInstanceDao taskInstanceDao, @NonNull DefaultTaskExecuteRunnableFactory defaultTaskExecuteRunnableFactory, - @NonNull ListenerEventAlertManager listenerEventAlertManager) { + @NonNull ListenerEventAlertManager listenerEventAlertManager, + @NonNull TaskGroupCoordinator taskGroupCoordinator) { this.processService = processService; this.commandService = commandService; this.processInstanceDao = processInstanceDao; @@ -250,6 +249,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { this.taskInstanceDao = taskInstanceDao; this.defaultTaskExecuteRunnableFactory = defaultTaskExecuteRunnableFactory; this.listenerEventAlertManager = listenerEventAlertManager; + this.taskGroupCoordinator = taskGroupCoordinator; TaskMetrics.registerTaskPrepared(standByTaskInstancePriorityQueue::size); } @@ -339,43 +339,6 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { return this.stateEvents.size(); } - public boolean checkForceStartAndWakeUp(StateEvent stateEvent) { - TaskGroupQueue taskGroupQueue = processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId()); - if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) { - log.info("Begin to force start taskGroupQueue: {}", taskGroupQueue.getId()); - TaskInstance taskInstance = taskInstanceDao.queryById(stateEvent.getTaskInstanceId()); - - DefaultTaskExecuteRunnable defaultTaskExecuteRunnable = - taskExecuteRunnableMap.get(taskInstance.getTaskCode()); - if (defaultTaskExecuteRunnable != null) { - defaultTaskExecuteRunnable.dispatch(); - this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), - TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); - log.info("Success force start task: {}, taskGroup: {}", taskGroupQueue.getTaskName(), - taskGroupQueue.getGroupId()); - } else { - log.warn("Cannot find the TaskExecuteRunnable: {}", taskGroupQueue.getTaskName()); - } - return true; - } - if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { - log.info("Begin to wake up taskGroupQueue: {}", taskGroupQueue.getId()); - boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue); - if (acquireTaskGroup) { - TaskInstance taskInstance = taskInstanceDao.queryById(stateEvent.getTaskInstanceId()); - taskExecuteRunnableMap.get(taskInstance.getTaskCode()).dispatch(); - log.info("Success wake up taskGroupQueue: {}", taskGroupQueue.getId()); - return true; - } - log.warn("Failed to wake up taskGroupQueue, taskGroupQueueId: {}", taskGroupQueue.getId()); - return false; - } else { - log.info( - "Failed to wake up the taskGroupQueue: {}, since the taskGroupQueue is not in queue, will no need to wake up.", - taskGroupQueue); - return true; - } - } public void processStart() { ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId()); @@ -408,6 +371,11 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { taskExecuteRunnableMap.remove(taskInstance.getTaskCode()); stateWheelExecuteThread.removeTask4TimeoutCheck(workflowInstance, taskInstance); stateWheelExecuteThread.removeTask4RetryCheck(workflowInstance, taskInstance); + if (taskInstance.getTaskGroupId() > 0) { + releaseTaskGroupIfNeeded(taskInstance); + log.info("Release task Group slot: {} for taskInstance: {} ", taskInstance.getTaskGroupId(), + taskInstance.getId()); + } if (taskInstance.getState().isSuccess()) { completeTaskSet.add(taskInstance.getTaskCode()); @@ -462,51 +430,16 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { } } - /** - * release task group - * - */ - public void releaseTaskGroup(TaskInstance taskInstance) { - ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + private void releaseTaskGroupIfNeeded(TaskInstance taskInstance) { // todo: use Integer if (taskInstance.getTaskGroupId() <= 0) { log.info("The current TaskInstance: {} doesn't use taskGroup, no need to release taskGroup", taskInstance.getName()); return; } - TaskInstance nextTaskInstance = processService.releaseTaskGroup(taskInstance); - if (nextTaskInstance == null) { - log.info( - "The current TaskInstance: {} is the last taskInstance in the taskGroup, no need to wakeup next taskInstance", - taskInstance.getName()); - return; - } - if (nextTaskInstance.getProcessInstanceId() == taskInstance.getProcessInstanceId()) { - TaskStateEvent nextEvent = TaskStateEvent.builder() - .processInstanceId(workflowInstance.getId()) - .taskInstanceId(nextTaskInstance.getId()) - .type(StateEventType.WAKE_UP_TASK_GROUP) - .build(); - stateEvents.add(nextEvent); - } else { - ProcessInstance processInstance = - processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); - if (processInstance == null) { - log.error("WorkflowInstance is null cannot wakeup, processInstanceId:{}", - nextTaskInstance.getProcessInstanceId()); - return; - } - if (processInstance.getHost() == null || Constants.NULL.equals(processInstance.getHost())) { - log.warn("The next WorkflowInstance: {} host is null no need to wakeup, maybe it is in failover", - processInstance); - return; - } - ILogicTaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(processInstance.getHost(), ILogicTaskInstanceOperator.class); - taskInstanceOperator.wakeupTaskInstance( - new TaskInstanceWakeupRequest(processInstance.getId(), nextTaskInstance.getId())); - } - log.info("Success send wakeup message to next taskInstance: {}", nextTaskInstance.getId()); + taskGroupCoordinator.releaseTaskGroupSlot(taskInstance); + log.info("Success release task Group slot: {} for taskInstance: {} ", taskInstance.getTaskGroupId(), + taskInstance.getName()); } /** @@ -790,10 +723,11 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { } else { listenerEventAlertManager.publishProcessFailListenerEvent(workflowInstance, projectUser); } - if (checkTaskQueue()) { - // release task group - processService.releaseAllTaskGroup(workflowInstance.getId()); - } + taskInstanceMap.forEach((id, taskInstance) -> { + if (taskInstance != null && taskInstance.getTaskGroupId() > 0) { + releaseTaskGroupIfNeeded(taskInstance); + } + }); // Log the workflowInstance in detail log.info(WorkflowInstanceUtils.logWorkflowInstanceInDetails(workflowInstance)); } @@ -1006,19 +940,11 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { // it will be wakeup when other tasks release the resource. int taskGroupId = taskInstance.getTaskGroupId(); if (taskGroupId > 0) { - boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(), - taskInstance.getName(), - taskGroupId, - taskInstance.getProcessInstanceId(), - taskInstance.getTaskGroupPriority()); - if (!acquireTaskGroup) { - log.info( - "Submitted task will not be dispatch right now because the first time to try to acquire" - + - " task group failed, taskInstanceName: {}, taskGroupId: {}", - taskInstance.getName(), taskGroupId); - return true; - } + taskGroupCoordinator.acquireTaskGroupSlot(taskInstance); + log.info("The TaskInstance: {} use taskGroup: {} to manage the resource, will wait to notify it", + taskInstance, + taskGroupId); + return true; } // 4. submit to dispatch queue tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable); @@ -1371,7 +1297,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { // set the task instance state to fault tolerance existTaskInstance.setFlag(Flag.NO); existTaskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE); - releaseTaskGroup(existTaskInstance); + releaseTaskGroupIfNeeded(existTaskInstance); validTaskMap.remove(existTaskInstance.getTaskCode()); taskInstanceDao.updateById(existTaskInstance); @@ -2090,16 +2016,6 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { return recoveryNodeCodeList; } - private boolean checkTaskQueue() { - AtomicBoolean result = new AtomicBoolean(false); - taskInstanceMap.forEach((id, taskInstance) -> { - if (taskInstance != null && taskInstance.getTaskGroupId() > 0) { - result.set(true); - } - }); - return result.get(); - } - private boolean isNewProcessInstance() { ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); if (Flag.YES.equals(workflowInstance.getRecovery())) { @@ -2126,6 +2042,17 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { return taskExecuteRunnableMap; } + public Optional getTaskExecuteRunnableById(Integer taskInstanceId) { + if (taskInstanceId == null) { + throw new IllegalArgumentException("taskInstanceId can't be null"); + } + TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId); + if (taskInstance == null) { + return Optional.empty(); + } + return Optional.ofNullable(taskExecuteRunnableMap.get(taskInstance.getTaskCode())); + } + public Map getWaitToRetryTaskInstanceMap() { return waitToRetryTaskInstanceMap; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java index fcd2a5f721..7caed6ba75 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; +import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator; import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.command.CommandService; @@ -73,6 +74,9 @@ public class WorkflowExecuteRunnableFactory { @Autowired private ListenerEventAlertManager listenerEventAlertManager; + @Autowired + private TaskGroupCoordinator taskGroupCoordinator; + public Optional createWorkflowExecuteRunnable(Command command) throws WorkflowCreateException { try { Optional workflowExecuteRunnableContextOptional = @@ -88,9 +92,10 @@ public class WorkflowExecuteRunnableFactory { curingGlobalParamsService, taskInstanceDao, defaultTaskExecuteRunnableFactory, - listenerEventAlertManager)); + listenerEventAlertManager, + taskGroupCoordinator)); } catch (Exception ex) { - throw new WorkflowCreateException("Create workflow execute runnable failed", ex); + throw new WorkflowCreateException("Create WorkflowExecuteRunnable failed", ex); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java new file mode 100644 index 0000000000..fae0d2b91f --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.taskgroup; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskGroup; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskGroupDao; +import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; +import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService; +import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; +import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupResponse; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.time.StopWatch; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * The TaskGroupCoordinator use to manage the task group slot. The task group slot is used to limit the number of {@link TaskInstance} that can be run at the same time. + *

+ * The {@link TaskGroupQueue} is used to represent the task group slot. When a {@link TaskGroupQueue} which inQueue is YES means the {@link TaskGroupQueue} is using by a {@link TaskInstance}. + *

+ * When the {@link TaskInstance} need to use task group, we should use @{@link TaskGroupCoordinator#acquireTaskGroupSlot(TaskInstance)} to acquire the task group slot, + * this method doesn't block should always acquire successfully, and you should directly stop dispatch the task instance. + * When the task group slot is available, the TaskGroupCoordinator will wake up the waiting {@link TaskInstance} to dispatch. + *

+ *     if(needAcquireTaskGroupSlot(taskInstance)) {
+ *         taskGroupCoordinator.acquireTaskGroupSlot(taskInstance);
+ *         return;
+ *     }
+ * 
+ *

+ * When the {@link TaskInstance} is finished, we should use @{@link TaskGroupCoordinator#releaseTaskGroupSlot(TaskInstance)} to release the task group slot. + *

+ *     if(needToReleaseTaskGroupSlot(taskInstance)) {
+ *         taskGroupCoordinator.releaseTaskGroupSlot(taskInstance);
+ *     }
+ * 
+ */ +@Slf4j +@Component +public class TaskGroupCoordinator extends BaseDaemonThread { + + @Autowired + private RegistryClient registryClient; + + @Autowired + private TaskGroupDao taskGroupDao; + + @Autowired + private TaskGroupQueueDao taskGroupQueueDao; + + @Autowired + private TaskInstanceDao taskInstanceDao; + + @Autowired + private ProcessInstanceDao processInstanceDao; + + public TaskGroupCoordinator() { + super("TaskGroupCoordinator"); + } + + @Override + public synchronized void start() { + log.info("TaskGroupCoordinator starting..."); + super.start(); + log.info("TaskGroupCoordinator started..."); + } + + @Override + public void run() { + while (!ServerLifeCycleManager.isStopped()) { + try { + if (!ServerLifeCycleManager.isRunning()) { + continue; + } + try { + registryClient.getLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath()); + StopWatch taskGroupCoordinatorRoundTimeCost = StopWatch.createStarted(); + + amendTaskGroupUseSize(); + amendTaskGroupQueueStatus(); + dealWithForceStartTaskGroupQueue(); + dealWithWaitingTaskGroupQueue(); + + taskGroupCoordinatorRoundTimeCost.stop(); + log.info("TaskGroupCoordinator round time cost: {}/ms", + taskGroupCoordinatorRoundTimeCost.getTime()); + } finally { + registryClient.releaseLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath()); + } + } catch (Throwable e) { + log.error("TaskGroupCoordinator error", e); + } finally { + // sleep 5s + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5); + } + } + } + + /** + * Make sure the TaskGroup useSize is equal to the TaskGroupQueue which status is {@link TaskGroupQueueStatus#ACQUIRE_SUCCESS} and forceStart is {@link org.apache.dolphinscheduler.common.enums.Flag#NO}. + */ + private void amendTaskGroupUseSize() { + // The TaskGroup useSize should equal to the TaskGroupQueue which inQueue is YES and forceStart is NO + List taskGroups = taskGroupDao.queryAllTaskGroups(); + if (CollectionUtils.isEmpty(taskGroups)) { + return; + } + for (TaskGroup taskGroup : taskGroups) { + List taskGroupQueues = + taskGroupQueueDao.queryAcquiredTaskGroupQueueByGroupId(taskGroup.getId()); + int actualUseSize = taskGroupQueues.size(); + if (taskGroup.getUseSize() == actualUseSize) { + continue; + } + log.warn("The TaskGroup: {} useSize is {}, but the actual use size is {}, will amend it", + taskGroup.getName(), + taskGroup.getUseSize(), actualUseSize); + taskGroup.setUseSize(actualUseSize); + taskGroupDao.updateById(taskGroup); + } + } + + /** + * Make sure the TaskGroupQueue status is {@link TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not exist or status is finished. + */ + private void amendTaskGroupQueueStatus() { + List taskGroupQueues = taskGroupQueueDao.queryAllInQueueTaskGroupQueue(); + List taskInstanceIds = taskGroupQueues.stream() + .map(TaskGroupQueue::getTaskId) + .collect(Collectors.toList()); + Map taskInstanceMap = taskInstanceDao.queryByIds(taskInstanceIds) + .stream() + .collect(Collectors.toMap(TaskInstance::getId, Function.identity())); + + for (TaskGroupQueue taskGroupQueue : taskGroupQueues) { + int taskId = taskGroupQueue.getTaskId(); + TaskInstance taskInstance = taskInstanceMap.get(taskId); + + if (taskInstance == null) { + log.warn("The TaskInstance: {} is not exist, will release the TaskGroupQueue: {}", taskId, + taskGroupQueue); + releaseTaskGroupQueueSlot(taskGroupQueue); + continue; + } + + if (taskInstance.getState().isFinished()) { + log.warn("The TaskInstance: {} state: {} finished, will release the TaskGroupQueue: {}", + taskInstance.getName(), taskInstance.getState(), taskGroupQueue); + releaseTaskGroupQueueSlot(taskGroupQueue); + continue; + } + } + } + + private void dealWithForceStartTaskGroupQueue() { + // Find the force start task group queue(Which is inQueue and forceStart is YES) + // Notify the related waiting task instance + // Set the taskGroupQueue status to RELEASE and remove it from queue + List taskGroupQueues = taskGroupQueueDao.queryAllInQueueTaskGroupQueue() + .stream() + .filter(taskGroupQueue -> Flag.YES.getCode() == taskGroupQueue.getForceStart()) + .collect(Collectors.toList()); + for (TaskGroupQueue taskGroupQueue : taskGroupQueues) { + try { + LogUtils.setTaskInstanceIdMDC(taskGroupQueue.getTaskId()); + // notify the waiting task instance + // We notify first, it notify failed, the taskGroupQueue will be in queue, and then we will retry it + // next time. + notifyWaitingTaskInstance(taskGroupQueue); + log.info("Notify the ForceStart waiting TaskInstance: {} for taskGroupQueue: {} success", + taskGroupQueue.getTaskName(), + taskGroupQueue.getId()); + + taskGroupQueue.setInQueue(Flag.NO.getCode()); + taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE); + taskGroupQueue.setUpdateTime(new Date()); + taskGroupQueueDao.updateById(taskGroupQueue); + log.info("Release the force start TaskGroupQueue {}", taskGroupQueue); + } catch (UnsupportedOperationException unsupportedOperationException) { + releaseTaskGroupQueueSlot(taskGroupQueue); + log.info( + "Notify the ForceStart TaskInstance: {} for taskGroupQueue: {} failed, will release the taskGroupQueue", + taskGroupQueue.getTaskName(), taskGroupQueue.getId(), unsupportedOperationException); + } catch (Throwable throwable) { + log.info("Notify the force start TaskGroupQueue {} failed", taskGroupQueue, throwable); + } finally { + LogUtils.removeTaskInstanceIdMDC(); + } + } + } + + private void dealWithWaitingTaskGroupQueue() { + // Find the TaskGroup which usage < maxSize. + // Find the highest priority inQueue task group queue(Which is inQueue and status is Waiting and force start is + // NO) belong to the + // task group. + List taskGroups = taskGroupDao.queryAvailableTaskGroups(); + if (CollectionUtils.isEmpty(taskGroups)) { + log.debug("There is no available task group"); + return; + } + for (TaskGroup taskGroup : taskGroups) { + int availableSize = taskGroup.getGroupSize() - taskGroup.getUseSize(); + if (availableSize <= 0) { + log.info("TaskGroup {} is full, available size is {}", taskGroup, availableSize); + continue; + } + List taskGroupQueues = + taskGroupQueueDao.queryAllInQueueTaskGroupQueueByGroupId(taskGroup.getId()) + .stream() + .filter(taskGroupQueue -> Flag.NO.getCode() == taskGroupQueue.getForceStart()) + .filter(taskGroupQueue -> TaskGroupQueueStatus.WAIT_QUEUE == taskGroupQueue.getStatus()) + .limit(availableSize) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(taskGroupQueues)) { + log.debug("There is no waiting task group queue for task group {}", taskGroup.getName()); + continue; + } + for (TaskGroupQueue taskGroupQueue : taskGroupQueues) { + try { + LogUtils.setTaskInstanceIdMDC(taskGroupQueue.getTaskId()); + // Reduce the taskGroupSize + boolean acquireResult = taskGroupDao.acquireTaskGroupSlot(taskGroup.getId()); + if (!acquireResult) { + log.error("Failed to acquire task group slot for task group {}", taskGroup); + continue; + } + // Notify the waiting task instance + // We notify first, it notify failed, the taskGroupQueue will be in queue, and then we will retry it + // next time. + notifyWaitingTaskInstance(taskGroupQueue); + + // Set the taskGroupQueue status to RUNNING and remove from queue + taskGroupQueue.setInQueue(Flag.YES.getCode()); + taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); + taskGroupQueue.setUpdateTime(new Date()); + taskGroupQueueDao.updateById(taskGroupQueue); + } catch (UnsupportedOperationException unsupportedOperationException) { + releaseTaskGroupQueueSlot(taskGroupQueue); + log.info( + "Notify the Waiting TaskInstance: {} for taskGroupQueue: {} failed, will release the taskGroupQueue", + taskGroupQueue.getTaskName(), taskGroupQueue.getId(), unsupportedOperationException); + } catch (Throwable throwable) { + log.error("Notify Waiting TaskGroupQueue: {} failed", taskGroupQueue, throwable); + } finally { + LogUtils.removeTaskInstanceIdMDC(); + } + } + } + } + + /** + * If the {@link TaskInstance#getTaskGroupId()} > 0, and the TaskGroup flag is {@link Flag#YES} then the task instance need to use task group. + * + * @param taskInstance task instance + * @return true if the TaskInstance need to acquireTaskGroupSlot + */ + public boolean needAcquireTaskGroupSlot(TaskInstance taskInstance) { + if (taskInstance == null) { + throw new IllegalArgumentException("The TaskInstance is null"); + } + if (taskInstance.getTaskGroupId() <= 0) { + log.debug("The current TaskInstance doesn't use TaskGroup, no need to acquire TaskGroupSlot"); + return false; + } + TaskGroup taskGroup = taskGroupDao.queryById(taskInstance.getTaskGroupId()); + if (taskGroup == null) { + log.warn("The current TaskGroup: {} does not exist, will not acquire TaskGroupSlot", + taskInstance.getTaskGroupId()); + return false; + } + return Flag.YES.equals(taskGroup.getStatus()); + } + + /** + * Acquire the task group slot for the given {@link TaskInstance}. + *

+ * When taskInstance want to acquire a TaskGroup slot, should call this method. If acquire successfully, will create a TaskGroupQueue in db which is in queue and status is {@link TaskGroupQueueStatus#WAIT_QUEUE}. + * The TaskInstance shouldn't dispatch until there exist available slot, the taskGroupCoordinator notify it. + * + * @param taskInstance the task instance which want to acquire task group slot. + * @throws IllegalArgumentException if the taskInstance is null or the used taskGroup doesn't exist. + */ + public void acquireTaskGroupSlot(TaskInstance taskInstance) { + if (taskInstance == null || taskInstance.getTaskGroupId() <= 0) { + throw new IllegalArgumentException("The current TaskInstance does not use task group"); + } + TaskGroup taskGroup = taskGroupDao.queryById(taskInstance.getTaskGroupId()); + if (taskGroup == null) { + throw new IllegalArgumentException( + "The current TaskGroup: " + taskInstance.getTaskGroupId() + " does not exist"); + } + // Write TaskGroupQueue in db, and then return wait TaskGroupCoordinator to notify it + // Set the taskGroupQueue status to WAIT_QUEUE and add to queue + // The queue only contains the taskGroupQueue which status is WAIT_QUEUE or ACQUIRE_SUCCESS + Date now = new Date(); + TaskGroupQueue taskGroupQueue = TaskGroupQueue + .builder() + .taskId(taskInstance.getId()) + .taskName(taskInstance.getName()) + .groupId(taskInstance.getTaskGroupId()) + .processId(taskInstance.getProcessInstanceId()) + .priority(taskInstance.getTaskGroupPriority()) + .inQueue(Flag.YES.getCode()) + .forceStart(Flag.NO.getCode()) + .status(TaskGroupQueueStatus.WAIT_QUEUE) + .createTime(now) + .updateTime(now) + .build(); + log.info("Success insert TaskGroupQueue: {} for TaskInstance: {}", taskGroupQueue, taskInstance.getName()); + taskGroupQueueDao.insert(taskGroupQueue); + } + + /** + * If the TaskInstance is using TaskGroup then it need to release TaskGroupSlot. + * + * @param taskInstance taskInsatnce + * @return true if the TaskInstance need to release TaskGroupSlot + */ + public boolean needToReleaseTaskGroupSlot(TaskInstance taskInstance) { + if (taskInstance == null) { + throw new IllegalArgumentException("The TaskInstance is null"); + } + if (taskInstance.getTaskGroupId() <= 0) { + log.debug("The current TaskInstance doesn't use TaskGroup, no need to release TaskGroupSlot"); + return false; + } + return true; + } + + /** + * Release the task group slot for the given {@link TaskInstance}. + *

+ * When taskInstance want to release a TaskGroup slot, should call this method. The release method will move the TaskGroupQueue out queue and set status to {@link TaskGroupQueueStatus#RELEASE}. + * This method is idempotent, this means that if the task group slot is already released, this method will do nothing. + * + * @param taskInstance the task instance which want to release task group slot. + * @throws IllegalArgumentException If the taskInstance is null or the task doesn't use task group. + */ + public void releaseTaskGroupSlot(TaskInstance taskInstance) { + if (taskInstance == null || taskInstance.getTaskGroupId() <= 0) { + throw new IllegalArgumentException("The current TaskInstance does not use task group"); + } + List taskGroupQueues = taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId()); + for (TaskGroupQueue taskGroupQueue : taskGroupQueues) { + releaseTaskGroupQueueSlot(taskGroupQueue); + } + } + + private void notifyWaitingTaskInstance(TaskGroupQueue taskGroupQueue) { + // Find the related waiting task instance + // send RPC to notify the waiting task instance + TaskInstance taskInstance = taskInstanceDao.queryById(taskGroupQueue.getTaskId()); + if (taskInstance == null) { + throw new UnsupportedOperationException( + "The TaskInstance: " + taskGroupQueue.getTaskId() + " is not exist, no need to notify"); + } + // todo: We may need to add a new status to represent the task instance is waiting for task group slot + if (taskInstance.getState() != TaskExecutionStatus.SUBMITTED_SUCCESS) { + throw new UnsupportedOperationException( + "The TaskInstance: " + taskInstance.getId() + " state is " + taskInstance.getState() + + ", no need to notify"); + } + ProcessInstance processInstance = processInstanceDao.queryById(taskInstance.getProcessInstanceId()); + if (processInstance == null) { + throw new UnsupportedOperationException( + "The WorkflowInstance: " + taskInstance.getProcessInstanceId() + + " is not exist, no need to notify"); + } + if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) { + throw new UnsupportedOperationException( + "The WorkflowInstance: " + processInstance.getId() + " state is " + processInstance.getState() + + ", no need to notify"); + } + if (processInstance.getHost() == null || Constants.NULL.equals(processInstance.getHost())) { + throw new UnsupportedOperationException( + "WorkflowInstance host is null, maybe it is in failover: " + processInstance); + } + + TaskInstanceWakeupRequest taskInstanceWakeupRequest = TaskInstanceWakeupRequest.builder() + .processInstanceId(processInstance.getId()) + .taskInstanceId(taskInstance.getId()) + .build(); + + IWorkflowInstanceService iWorkflowInstanceService = SingletonJdkDynamicRpcClientProxyFactory + .getProxyClient(processInstance.getHost(), IWorkflowInstanceService.class); + TaskInstanceWakeupResponse taskInstanceWakeupResponse = + iWorkflowInstanceService.wakeupTaskInstance(taskInstanceWakeupRequest); + if (!taskInstanceWakeupResponse.isSuccess()) { + throw new UnsupportedOperationException( + "Notify TaskInstance: " + taskInstance.getId() + " failed: " + taskInstanceWakeupResponse); + } + log.info("Wake up TaskInstance: {} success", taskInstance.getName()); + } + + private void releaseTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) { + if (TaskGroupQueueStatus.RELEASE.equals(taskGroupQueue.getStatus()) + && Flag.NO.getCode() == taskGroupQueue.getInQueue()) { + log.info("The TaskGroupQueue: {} is already released", taskGroupQueue); + return; + } + taskGroupQueue.setInQueue(Flag.NO.getCode()); + taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE); + taskGroupQueue.setUpdateTime(new Date()); + taskGroupQueueDao.updateById(taskGroupQueue); + log.info("Success release TaskGroupQueue: {}", taskGroupQueue); + } + +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index 28c0010e3b..c08fb206f4 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; +import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator; import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -49,7 +50,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import java.lang.reflect.Field; import java.lang.reflect.Method; -import java.text.ParseException; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -104,6 +104,8 @@ public class WorkflowExecuteRunnableTest { private ListenerEventAlertManager listenerEventAlertManager; + private TaskGroupCoordinator taskGroupCoordinator; + @BeforeEach public void init() throws Exception { applicationContext = Mockito.mock(ApplicationContext.class); @@ -138,6 +140,8 @@ public class WorkflowExecuteRunnableTest { Mockito.when(workflowExecuteContext.getWorkflowGraph()).thenReturn(workflowGraph); Mockito.when(workflowGraph.getDag()).thenReturn(new DAG<>()); + taskGroupCoordinator = Mockito.mock(TaskGroupCoordinator.class); + workflowExecuteThread = Mockito.spy( new WorkflowExecuteRunnable( workflowExecuteContext, @@ -150,11 +154,12 @@ public class WorkflowExecuteRunnableTest { curingGlobalParamsService, taskInstanceDao, defaultTaskExecuteRunnableFactory, - listenerEventAlertManager)); + listenerEventAlertManager, + taskGroupCoordinator)); } @Test - public void testParseStartNodeName() throws ParseException { + public void testParseStartNodeName() { try { Map cmdParam = new HashMap<>(); cmdParam.put(CMD_PARAM_START_NODES, "1,2,3"); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java new file mode 100644 index 0000000000..f654550b75 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.taskgroup; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; +import org.apache.dolphinscheduler.dao.entity.TaskGroup; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskGroupDao; +import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; + +import java.util.List; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import com.google.common.collect.Lists; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class TaskGroupCoordinatorTest { + + @InjectMocks + private TaskGroupCoordinator taskGroupCoordinator; + + @Mock + private RegistryClient registryClient; + + @Mock + private TaskGroupDao taskGroupDao; + + @Mock + private TaskGroupQueueDao taskGroupQueueDao; + + @Mock + private TaskInstanceDao taskInstanceDao; + + @Mock + private ProcessInstanceDao processInstanceDao; + + @Test + void start() throws InterruptedException { + // Get the Lock from Registry + taskGroupCoordinator.start(); + Thread.sleep(1_000); + verify(registryClient, Mockito.times(1)) + .getLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath()); + verify(registryClient, Mockito.times(1)) + .releaseLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath()); + + } + + @Test + void needAcquireTaskGroupSlot() { + // TaskInstance is null + IllegalArgumentException illegalArgumentException = + assertThrows(IllegalArgumentException.class, () -> taskGroupCoordinator.needAcquireTaskGroupSlot(null)); + assertEquals("The TaskInstance is null", illegalArgumentException.getMessage()); + + // TaskGroupId < 0 + TaskInstance taskInstance = new TaskInstance(); + assertFalse(taskGroupCoordinator.needAcquireTaskGroupSlot(taskInstance)); + + // TaskGroup not exist + taskInstance.setTaskGroupId(1); + when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(null); + assertFalse(taskGroupCoordinator.needAcquireTaskGroupSlot(taskInstance)); + + // TaskGroup is closed + TaskGroup taskGroup = new TaskGroup(); + taskGroup.setStatus(Flag.NO); + when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(taskGroup); + assertFalse(taskGroupCoordinator.needAcquireTaskGroupSlot(taskInstance)); + + // TaskGroup is open + taskGroup.setStatus(Flag.YES); + when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(taskGroup); + assertTrue(taskGroupCoordinator.needToReleaseTaskGroupSlot(taskInstance)); + + } + + @Test + void acquireTaskGroupSlot() { + // TaskInstance is NULL + IllegalArgumentException illegalArgumentException = + assertThrows(IllegalArgumentException.class, () -> taskGroupCoordinator.acquireTaskGroupSlot(null)); + assertEquals("The current TaskInstance does not use task group", illegalArgumentException.getMessage()); + + // TaskGroupId is NULL + TaskInstance taskInstance = new TaskInstance(); + illegalArgumentException = assertThrows(IllegalArgumentException.class, + () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance)); + assertEquals("The current TaskInstance does not use task group", illegalArgumentException.getMessage()); + + // TaskGroup not exist + taskInstance.setTaskGroupId(1); + taskInstance.setId(1); + when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(null); + illegalArgumentException = assertThrows(IllegalArgumentException.class, + () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance)); + assertEquals("The current TaskGroup: 1 does not exist", illegalArgumentException.getMessage()); + + // TaskGroup exist + when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(new TaskGroup()); + Assertions.assertDoesNotThrow(() -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance)); + + } + + @Test + void needToReleaseTaskGroupSlot() { + IllegalArgumentException illegalArgumentException = assertThrows(IllegalArgumentException.class, + () -> taskGroupCoordinator.needToReleaseTaskGroupSlot(null)); + assertEquals("The TaskInstance is null", illegalArgumentException.getMessage()); + + TaskInstance taskInstance = new TaskInstance(); + assertFalse(taskGroupCoordinator.needToReleaseTaskGroupSlot(taskInstance)); + + taskInstance.setTaskGroupId(1); + assertTrue(taskGroupCoordinator.needToReleaseTaskGroupSlot(taskInstance)); + } + + @Test + void releaseTaskGroupSlot() { + // TaskInstance is NULL + IllegalArgumentException illegalArgumentException = + assertThrows(IllegalArgumentException.class, () -> taskGroupCoordinator.releaseTaskGroupSlot(null)); + assertEquals("The current TaskInstance does not use task group", illegalArgumentException.getMessage()); + + // TaskGroupId is NULL + TaskInstance taskInstance = new TaskInstance(); + illegalArgumentException = assertThrows(IllegalArgumentException.class, + () -> taskGroupCoordinator.releaseTaskGroupSlot(taskInstance)); + assertEquals("The current TaskInstance does not use task group", illegalArgumentException.getMessage()); + + // Release TaskGroupQueue + taskInstance.setId(1); + taskInstance.setTaskGroupId(1); + TaskGroupQueue taskGroupQueue = new TaskGroupQueue(); + List taskGroupQueues = Lists.newArrayList(taskGroupQueue); + when(taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId())).thenReturn(taskGroupQueues); + taskGroupCoordinator.releaseTaskGroupSlot(taskInstance); + + assertEquals(Flag.NO.getCode(), taskGroupQueue.getInQueue()); + assertEquals(TaskGroupQueueStatus.RELEASE, taskGroupQueue.getStatus()); + verify(taskGroupQueueDao, Mockito.times(1)).updateById(taskGroupQueue); + + } +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java index f5c07e66f6..a1f3bb02b7 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java @@ -28,6 +28,7 @@ public enum RegistryNodeType { MASTER("Master", "/nodes/master"), MASTER_NODE_LOCK("MasterNodeLock", "/lock/master-node"), MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"), + MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", "/lock/master-task-group-coordinator"), WORKER("Worker", "/nodes/worker"), ALERT_SERVER("AlertServer", "/nodes/alert-server"), ALERT_LOCK("AlertNodeLock", "/lock/alert"), diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 70d2593a32..091d6353fb 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; -import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; @@ -51,7 +50,6 @@ import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.model.TaskNode; import java.util.List; -import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; @@ -72,8 +70,6 @@ public interface ProcessService { ProcessInstance findProcessInstanceById(int processId); - ProcessDefinition findProcessDefineById(int processDefinitionId); - ProcessDefinition findProcessDefinition(Long processDefinitionCode, int processDefinitionVersion); ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode); @@ -92,9 +88,6 @@ public interface ProcessService { void setSubProcessParam(ProcessInstance subProcessInstance); - boolean submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, - long commitInterval); - @Transactional boolean submitTask(ProcessInstance processInstance, TaskInstance taskInstance); @@ -129,18 +122,10 @@ public interface ProcessService { DataSource findDataSourceById(int id); - ProcessInstance findProcessInstanceByTaskId(int taskId); - List queryUdfFunListByIds(Integer[] ids); - List selectAllByProcessDefineCode(long[] codes); - - String queryUserQueueByProcessInstance(ProcessInstance processInstance); - ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId); - List getProjectListHavePerm(int userId); - List listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType); User getUserById(int userId); @@ -175,8 +160,6 @@ public interface ProcessService { List transformTask(List taskRelationList, List taskDefinitionLogs); - Map notifyProcessList(int processId); - DqExecuteResult getDqExecuteResultByTaskInstanceId(int taskInstanceId); int updateDqExecuteResultUserId(int taskInstanceId); @@ -195,16 +178,6 @@ public interface ProcessService { DqComparisonType getComparisonTypeById(int id); - boolean acquireTaskGroup(int taskId, - String taskName, int groupId, - int processId, int priority); - - boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue); - - void releaseAllTaskGroup(int processInstanceId); - - TaskInstance releaseTaskGroup(TaskInstance taskInstance); - void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status); TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId, @@ -214,12 +187,6 @@ public interface ProcessService { Integer priority, TaskGroupQueueStatus status); - int updateTaskGroupQueueStatus(Integer taskId, int status); - - int updateTaskGroupQueue(TaskGroupQueue taskGroupQueue); - - TaskGroupQueue loadTaskGroupQueue(int taskId); - ProcessInstance loadNextProcess4Serial(long code, int state, int id); public String findConfigYamlByName(String clusterName); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 706e183cf0..93b257ba27 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -67,12 +67,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; -import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; -import org.apache.dolphinscheduler.dao.entity.TaskGroup; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; @@ -410,17 +408,6 @@ public class ProcessServiceImpl implements ProcessService { return processInstanceMapper.selectById(processId); } - /** - * find process define by id. - * - * @param processDefinitionId processDefinitionId - * @return process definition - */ - @Override - public ProcessDefinition findProcessDefineById(int processDefinitionId) { - return processDefineMapper.selectById(processDefinitionId); - } - /** * find process define by code and version. * @@ -1080,34 +1067,6 @@ public class ProcessServiceImpl implements ProcessService { taskInstanceDao.updateById(taskInstance); } - /** - * retry submit task to db - */ - @Override - public boolean submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, - int commitRetryTimes, long commitInterval) { - int retryTimes = 1; - while (retryTimes <= commitRetryTimes) { - try { - // submit task to db - // Only want to use transaction here - if (submitTask(processInstance, taskInstance)) { - return true; - } - log.error( - "task commit to db failed , taskCode: {} has already retry {} times, please check the database", - taskInstance.getTaskCode(), - retryTimes); - Thread.sleep(commitInterval); - } catch (Exception e) { - log.error("task commit to db failed", e); - } finally { - retryTimes += 1; - } - } - return false; - } - /** * // todo: This method need to refactor, we find when the db down, but the taskInstanceId is not 0. It's better to change to void, rather than return TaskInstance * submit task to db @@ -1260,20 +1219,6 @@ public class ProcessServiceImpl implements ProcessService { } } - /** - * get sub work flow command type - * child instance exist: child command = fatherCommand - * child instance not exists: child command = fatherCommand[0] - */ - private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) { - CommandType commandType = parentProcessInstance.getCommandType(); - if (childInstance == null) { - String fatherHistoryCommand = parentProcessInstance.getHistoryCmd(); - commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]); - } - return commandType; - } - /** * update sub process definition * @@ -1573,21 +1518,6 @@ public class ProcessServiceImpl implements ProcessService { return dataSourceMapper.selectById(id); } - /** - * find process instance by the task id - * - * @param taskId taskId - * @return process instance - */ - @Override - public ProcessInstance findProcessInstanceByTaskId(int taskId) { - TaskInstance taskInstance = taskInstanceMapper.selectById(taskId); - if (taskInstance != null) { - return processInstanceMapper.selectById(taskInstance.getProcessInstanceId()); - } - return null; - } - /** * find udf function list by id list string * @@ -1599,37 +1529,6 @@ public class ProcessServiceImpl implements ProcessService { return udfFuncMapper.queryUdfByIdStr(ids, null); } - /** - * find schedule list by process define codes. - * - * @param codes codes - * @return schedule list - */ - @Override - public List selectAllByProcessDefineCode(long[] codes) { - return scheduleMapper.selectAllByProcessDefineArray(codes); - } - - /** - * query user queue by process instance - * - * @param processInstance processInstance - * @return queue - */ - @Override - public String queryUserQueueByProcessInstance(ProcessInstance processInstance) { - - String queue = ""; - if (processInstance == null) { - return queue; - } - User executor = userMapper.selectById(processInstance.getExecutorId()); - if (executor != null) { - queue = executor.getQueue(); - } - return queue; - } - /** * query project name and user name by processInstanceId. * @@ -1641,27 +1540,6 @@ public class ProcessServiceImpl implements ProcessService { return projectMapper.queryProjectWithUserByProcessInstanceId(processInstanceId); } - /** - * get have perm project list - * - * @param userId userId - * @return project list - */ - @Override - public List getProjectListHavePerm(int userId) { - List createProjects = projectMapper.queryProjectCreatedByUser(userId); - List authedProjects = projectMapper.queryAuthedProjectListByUserId(userId); - - if (createProjects == null) { - createProjects = new ArrayList<>(); - } - - if (authedProjects != null) { - createProjects.addAll(authedProjects); - } - return createProjects; - } - /** * list unauthorized udf function * @@ -2108,23 +1986,6 @@ public class ProcessServiceImpl implements ProcessService { return taskNodeList; } - @Override - public Map notifyProcessList(int processId) { - HashMap processTaskMap = new HashMap<>(); - // find sub tasks - ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryBySubProcessId(processId); - if (processInstanceMap == null) { - return processTaskMap; - } - ProcessInstance fatherProcess = this.findProcessInstanceById(processInstanceMap.getParentProcessInstanceId()); - TaskInstance fatherTask = taskInstanceDao.queryById(processInstanceMap.getParentTaskInstanceId()); - - if (fatherProcess != null) { - processTaskMap.put(fatherProcess, fatherTask); - } - return processTaskMap; - } - @Override public DqExecuteResult getDqExecuteResultByTaskInstanceId(int taskInstanceId) { return dqExecuteResultMapper.getExecuteResultById(taskInstanceId); @@ -2195,167 +2056,6 @@ public class ProcessServiceImpl implements ProcessService { return dqComparisonTypeMapper.selectById(id); } - /** - * the first time (when submit the task ) get the resource of the task group - */ - @Override - public boolean acquireTaskGroup(int taskInstanceId, - String taskName, - int taskGroupId, - int workflowInstanceId, - int taskGroupPriority) { - TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupId); - if (taskGroup == null) { - // we don't throw exception here, to avoid the task group has been deleted during workflow running - log.warn("The taskGroup is not exist no need to acquire taskGroup, taskGroupId: {}", taskGroupId); - return true; - } - // if task group is not applicable - if (taskGroup.getStatus() == Flag.NO.getCode()) { - log.warn("The taskGroup status is {}, no need to acquire taskGroup, taskGroupId: {}", taskGroup.getStatus(), - taskGroupId); - return true; - } - // Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS - TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskInstanceId); - if (taskGroupQueue == null) { - taskGroupQueue = insertIntoTaskGroupQueue( - taskInstanceId, - taskName, - taskGroupId, - workflowInstanceId, - taskGroupPriority, - TaskGroupQueueStatus.WAIT_QUEUE); - log.info("Insert TaskGroupQueue: {} successfully", taskGroupQueue.getId()); - } else { - log.info("The task queue is already exist, taskId: {}", taskInstanceId); - if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) { - return true; - } - } - // check if there already exist higher priority tasks - List highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks( - taskGroupId, - taskGroupPriority, - TaskGroupQueueStatus.WAIT_QUEUE.getCode()); - if (CollectionUtils.isNotEmpty(highPriorityTasks)) { - return false; - } - // try to get taskGroup - int availableTaskGroupCount = taskGroupMapper.selectAvailableCountById(taskGroupId); - if (availableTaskGroupCount < 1) { - log.info( - "Failed to acquire taskGroup, there is no avaliable taskGroup, taskInstanceId: {}, taskGroupId: {}", - taskInstanceId, taskGroupId); - taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); - return false; - } - return robTaskGroupResource(taskGroupQueue); - } - - /** - * try to get the task group resource(when other task release the resource) - */ - @Override - public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) { - // set the default max size to avoid dead loop - for (int i = 0; i < 10; i++) { - TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId()); - if (taskGroup.getGroupSize() <= taskGroup.getUseSize()) { - // remove - taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); - log.info("The current task Group is full, taskGroup: {}", taskGroup); - return false; - } - int affectedCount = taskGroupMapper.robTaskGroupResource( - taskGroup.getId(), - taskGroup.getUseSize(), - taskGroupQueue.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode()); - if (affectedCount > 0) { - log.info("Success rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), - taskGroupQueue.getId()); - taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); - this.taskGroupQueueMapper.updateById(taskGroupQueue); - this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); - return true; - } - } - log.info("Failed to rob taskGroup, taskGroupQueue: {}", taskGroupQueue); - taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); - return false; - } - - @Override - public void releaseAllTaskGroup(int processInstanceId) { - List taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, - TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); - for (TaskInstance info : taskInstances) { - releaseTaskGroup(info); - } - } - - /** - * release the TGQ resource when the corresponding task is finished. - * - * @return the result code and msg - */ - @Override - public TaskInstance releaseTaskGroup(TaskInstance taskInstance) { - - TaskGroup taskGroup; - TaskGroupQueue thisTaskGroupQueue; - log.info("Begin to release task group: {}", taskInstance.getTaskGroupId()); - try { - do { - taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); - if (taskGroup == null) { - log.error("The taskGroup is not exist no need to release taskGroup, taskGroupId: {}", - taskInstance.getTaskGroupId()); - return null; - } - thisTaskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); - if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) { - log.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId()); - return null; - } - if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.WAIT_QUEUE) { - log.info("The taskGroupQueue's status is in waiting, will not need to release task group"); - break; - } - } while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() - && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), - taskGroup.getUseSize(), - thisTaskGroupQueue.getId(), - TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1); - } catch (Exception e) { - log.error("release the task group error", e); - return null; - } - log.info("Finished to release task group, taskGroupId: {}", taskInstance.getTaskGroupId()); - - log.info("Begin to release task group queue, taskGroupId: {}", taskInstance.getTaskGroupId()); - changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE); - TaskGroupQueue taskGroupQueue; - do { - taskGroupQueue = taskGroupQueueMapper.queryTheHighestPriorityTasks( - taskGroup.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode(), - Flag.NO.getCode(), - Flag.NO.getCode()); - if (taskGroupQueue == null) { - log.info("There is no taskGroupQueue need to be wakeup taskGroup: {}", taskGroup.getId()); - return null; - } - } while (this.taskGroupQueueMapper.updateInQueueCAS( - Flag.NO.getCode(), - Flag.YES.getCode(), - taskGroupQueue.getId()) != 1); - log.info("Finished to release task group queue: taskGroupId: {}, taskGroupQueueId: {}", - taskInstance.getTaskGroupId(), taskGroupQueue.getId()); - return taskInstanceMapper.selectById(taskGroupQueue.getTaskId()); - } - /** * release the TGQ resource when the corresponding task is finished. * @@ -2396,21 +2096,6 @@ public class ProcessServiceImpl implements ProcessService { return taskGroupQueue; } - @Override - public int updateTaskGroupQueueStatus(Integer taskId, int status) { - return taskGroupQueueMapper.updateStatusByTaskId(taskId, status); - } - - @Override - public int updateTaskGroupQueue(TaskGroupQueue taskGroupQueue) { - return taskGroupQueueMapper.updateById(taskGroupQueue); - } - - @Override - public TaskGroupQueue loadTaskGroupQueue(int taskId) { - return this.taskGroupQueueMapper.queryByTaskId(taskId); - } - @Override public ProcessInstance loadNextProcess4Serial(long code, int state, int id) { return this.processInstanceMapper.loadNextProcess4Serial(code, state, id); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 70157f6a05..5e20c5ea2b 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -749,22 +749,6 @@ public class ProcessServiceTest { Assertions.assertNotNull(taskGroupQueue); } - @Test - public void testDoRelease() { - - TaskGroupQueue taskGroupQueue = getTaskGroupQueue(); - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setId(1); - taskInstance.setProcessInstanceId(1); - taskInstance.setTaskGroupId(taskGroupQueue.getGroupId()); - - when(taskGroupQueueMapper.queryByTaskId(1)).thenReturn(taskGroupQueue); - when(taskGroupQueueMapper.updateById(taskGroupQueue)).thenReturn(1); - - processService.releaseTaskGroup(taskInstance); - - } - private TaskGroupQueue getTaskGroupQueue() { TaskGroupQueue taskGroupQueue = new TaskGroupQueue(); taskGroupQueue.setTaskName("task name");