Browse Source

Fix TaskGroupQueue will never be wakeup due to wakeup failed at one time (#15528)

dev_wenjun_refactorMaster
Wenjun Ruan 10 months ago committed by GitHub
parent
commit
89742332a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      .github/workflows/backend.yml
  2. 26
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java
  3. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java
  4. 49
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  5. 18
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java
  6. 23
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
  7. 9
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
  8. 33
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
  9. 32
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
  10. 17
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
  11. 62
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupDao.java
  12. 64
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java
  13. 70
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImpl.java
  14. 73
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java
  15. 58
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
  16. 61
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
  17. 1
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  18. 1
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  19. 2
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  20. 17
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql
  21. 4
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql
  22. 17
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql
  23. 4
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql
  24. 9
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java
  25. 118
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImplTest.java
  26. 108
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java
  27. 9
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java
  28. 5
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowInstanceService.java
  29. 42
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartRequest.java
  30. 41
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartResponse.java
  31. 19
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceWakeupRequest.java
  32. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  33. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java
  34. 11
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
  35. 47
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
  36. 13
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java
  37. 14
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java
  38. 58
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceForceStartOperationFunction.java
  39. 36
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceWakeupOperationFunction.java
  40. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowInstanceServiceImpl.java
  41. 147
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  42. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java
  43. 451
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
  44. 11
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
  45. 182
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
  46. 1
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
  47. 33
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  48. 315
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  49. 16
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

2
.github/workflows/backend.yml

@ -149,7 +149,7 @@ jobs:
fail-fast: false
matrix:
db: ["mysql", "postgresql"]
version: ["2.0.9", "3.0.6", "3.1.8", "3.2.0"]
version: ["2.0.9", "3.0.6", "3.1.9", "3.2.0"]
steps:
- name: Set up JDK 8
uses: actions/setup-java@v2

26
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java

@ -298,7 +298,7 @@ public class TaskGroupController extends BaseController {
* @param pageSize page size
* @return queue list
*/
@Operation(summary = "queryTasksByGroupId", description = "QUERY_ALL_TASKS_GROUP_NOTES")
@Operation(summary = "queryTaskGroupQueuesByGroupId", description = "QUERY_TASKS_GROUP_GROUP_QUEUES")
@Parameters({
@Parameter(name = "groupId", description = "GROUP_ID", required = false, schema = @Schema(implementation = int.class, example = "1", defaultValue = "-1")),
@Parameter(name = "taskInstanceName", description = "TASK_INSTANCE_NAME", required = false, schema = @Schema(implementation = String.class, example = "taskName")),
@ -310,15 +310,21 @@ public class TaskGroupController extends BaseController {
@GetMapping(value = "/query-list-by-group-id")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_GROUP_QUEUE_LIST_ERROR)
public Result queryTasksByGroupId(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId,
@RequestParam(value = "taskInstanceName", required = false) String taskName,
@RequestParam(value = "processInstanceName", required = false) String processName,
@RequestParam(value = "status", required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(loginUser, taskName, processName, status,
groupId, pageNo, pageSize);
public Result queryTaskGroupQueues(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId,
@RequestParam(value = "taskInstanceName", required = false) String taskName,
@RequestParam(value = "processInstanceName", required = false) String processName,
@RequestParam(value = "status", required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(
loginUser,
taskName,
processName,
status,
groupId,
pageNo,
pageSize);
return returnDataList(result);
}

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java

@ -50,14 +50,6 @@ public interface TaskGroupService {
Map<String, Object> updateTaskGroup(User loginUser, int id, String name,
String description, int groupSize);
/**
* get task group status
*
* @param id task group id
* @return the result code and msg
*/
boolean isTheTaskGroupAvailable(int id);
/**
* query all task group by user id
*

49
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -55,7 +55,6 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
@ -83,14 +82,12 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.master.IStreamingTaskOperator;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService;
import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerRequest;
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerResponse;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.service.command.CommandService;
@ -552,16 +549,20 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Map<String, Object> result = new HashMap<>();
TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.selectById(queueId);
// check process instance exist
ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId());
if (processInstance == null) {
log.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.",
taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId());
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId());
return result;
ProcessInstance processInstance = processInstanceDao.queryOptionalById(taskGroupQueue.getProcessId())
.orElseThrow(
() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId()));
checkMasterExists();
if (taskGroupQueue.getInQueue() == Flag.NO.getCode()) {
throw new ServiceException(Status.TASK_GROUP_QUEUE_ALREADY_START);
}
taskGroupQueue.setForceStart(Flag.YES.getCode());
taskGroupQueue.setUpdateTime(new Date());
taskGroupQueueMapper.updateById(taskGroupQueue);
checkMasterExists();
return forceStart(processInstance, taskGroupQueue);
result.put(Constants.STATUS, Status.SUCCESS);
return result;
}
public void checkStartNodeList(String startNodeList, Long processDefinitionCode, int version) {
@ -664,32 +665,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
/**
* prepare to update process instance command type and status
*
* @param processInstance process instance
* @return update result
*/
private Map<String, Object> forceStart(ProcessInstance processInstance, TaskGroupQueue taskGroupQueue) {
Map<String, Object> result = new HashMap<>();
if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) {
log.warn("Task group queue already starts, taskGroupQueueId:{}.", taskGroupQueue.getId());
putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START);
return result;
}
taskGroupQueue.setForceStart(Flag.YES.getCode());
taskGroupQueue.setUpdateTime(new Date());
processService.updateTaskGroupQueue(taskGroupQueue);
log.info("Sending force start command to master: {}.", processInstance.getHost());
ILogicTaskInstanceOperator iLogicTaskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstance.getHost(), ILogicTaskInstanceOperator.class);
iLogicTaskInstanceOperator.forceStartTaskInstance(
new TaskInstanceForceStartRequest(processInstance.getId(), taskGroupQueue.getTaskId()));
putMsg(result, Status.SUCCESS);
return result;
}
/**
* check whether sub processes are offline before starting process definition
*

18
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java

@ -66,8 +66,13 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
* @return tasks list
*/
@Override
public Map<String, Object> queryTasksByGroupId(User loginUser, String taskName, String processName, Integer status,
int groupId, int pageNo, int pageSize) {
public Map<String, Object> queryTasksByGroupId(User loginUser,
String taskName,
String processName,
Integer status,
int groupId,
int pageNo,
int pageSize) {
Map<String, Object> result = new HashMap<>();
Page<TaskGroupQueue> page = new Page<>(pageNo, pageSize);
PageInfo<TaskGroupQueue> pageInfo = new PageInfo<>(pageNo, pageSize);
@ -79,8 +84,13 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
return result;
}
List<Project> projects = projectMapper.selectBatchIds(projectIds);
IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(page,
taskName, processName, status, groupId, projects);
IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(
page,
taskName,
processName,
status,
groupId,
projects);
pageInfo.setTotal((int) taskGroupQueue.getTotal());
pageInfo.setTotalList(taskGroupQueue.getRecords());

23
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java

@ -122,7 +122,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
.description(description)
.groupSize(groupSize)
.userId(loginUser.getId())
.status(Flag.YES.getCode())
.status(Flag.YES)
.createTime(now)
.updateTime(now)
.build();
@ -180,7 +180,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
return result;
}
if (taskGroup.getStatus() != Flag.YES.getCode()) {
if (taskGroup.getStatus() != Flag.YES) {
log.warn("Task group has been closed, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_ERROR);
return result;
@ -202,17 +202,6 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
return result;
}
/**
* get task group status
*
* @param id task group id
* @return is the task group available
*/
@Override
public boolean isTheTaskGroupAvailable(int id) {
return taskGroupMapper.selectCountByIdStatus(id, Flag.YES.getCode()) == 1;
}
/**
* query all task group by user id
*
@ -331,12 +320,12 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
return result;
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
if (taskGroup.getStatus() == Flag.NO.getCode()) {
if (taskGroup.getStatus() == Flag.NO) {
log.info("Task group has been closed, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_CLOSED);
return result;
}
taskGroup.setStatus(Flag.NO.getCode());
taskGroup.setStatus(Flag.NO);
int update = taskGroupMapper.updateById(taskGroup);
if (update > 0)
log.info("Task group close complete, taskGroupId:{}.", id);
@ -364,12 +353,12 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
return result;
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
if (taskGroup.getStatus() == Flag.YES.getCode()) {
if (taskGroup.getStatus() == Flag.YES) {
log.info("Task group has been started, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_OPENED);
return result;
}
taskGroup.setStatus(Flag.YES.getCode());
taskGroup.setStatus(Flag.YES);
taskGroup.setUpdateTime(new Date(System.currentTimeMillis()));
int update = taskGroupMapper.updateById(taskGroup);
if (update > 0)

9
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java

@ -125,7 +125,7 @@ public class TaskGroupServiceTest {
.description(taskGroupDesc)
.groupSize(100)
.userId(1)
.status(Flag.YES.getCode())
.status(Flag.YES)
.build();
return taskGroup;
@ -204,7 +204,7 @@ public class TaskGroupServiceTest {
User loginUser = getLoginUser();
TaskGroup taskGroup = getTaskGroup();
taskGroup.setStatus(Flag.YES.getCode());
taskGroup.setStatus(Flag.YES);
// Task group status error
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.TASK_GROUP,
@ -218,7 +218,6 @@ public class TaskGroupServiceTest {
logger.info(result.toString());
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
taskGroup.setStatus(0);
}
@Test
@ -236,12 +235,12 @@ public class TaskGroupServiceTest {
Map<String, Object> result = taskGroupService.closeTaskGroup(loginUser, 1);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
taskGroup.setStatus(0);
taskGroup.setStatus(Flag.NO);
Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup);
result = taskGroupService.closeTaskGroup(loginUser, 1);
Assertions.assertEquals(Status.TASK_GROUP_STATUS_CLOSED, result.get(Constants.STATUS));
taskGroup.setStatus(1);
taskGroup.setStatus(Flag.YES);
Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup);
result = taskGroupService.startTaskGroup(loginUser, 1);
Assertions.assertEquals(Status.TASK_GROUP_STATUS_OPENED, result.get(Constants.STATUS));

33
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.Flag;
import java.io.Serializable;
import java.util.Date;
@ -36,44 +38,17 @@ import com.baomidou.mybatisplus.annotation.TableName;
@TableName("t_ds_task_group")
public class TaskGroup implements Serializable {
/**
* key
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* task_group name
*/
private String name;
private long projectCode;
private String description;
/**
* 作业组大小
*/
private int groupSize;
/**
* 已使用作业组大小
*/
private int useSize;
/**
* creator id
*/
private int userId;
/**
* 0 not available, 1 available
*/
private Integer status;
/**
* create time
*/
private Flag status;
private Date createTime;
/**
* update time
*/
private Date updateTime;
/**
* project code
*/
private long projectCode;
}

32
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java

@ -35,20 +35,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
*/
public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
int robTaskGroupResource(@Param("id") int id,
@Param("currentUseSize") int currentUseSize,
@Param("queueId") int queueId,
@Param("queueStatus") int queueStatus);
/**
* update table of task group
*
* @param id primary key
* @return affected rows
*/
int releaseTaskGroupResource(@Param("id") int id, @Param("useSize") int useSize,
@Param("queueId") int queueId, @Param("queueStatus") int queueStatus);
/**
* select task groups paging
*
@ -57,7 +43,8 @@ public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
* @param status status
* @return result page
*/
IPage<TaskGroup> queryTaskGroupPaging(IPage<TaskGroup> page, @Param("name") String name,
IPage<TaskGroup> queryTaskGroupPaging(IPage<TaskGroup> page,
@Param("name") String name,
@Param("status") Integer status);
/**
@ -69,13 +56,6 @@ public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
*/
TaskGroup queryByName(@Param("userId") int userId, @Param("name") String name);
/**
* Select the groupSize > useSize Count
*/
int selectAvailableCountById(@Param("groupId") int groupId);
int selectCountByIdStatus(@Param("id") int id, @Param("status") int status);
IPage<TaskGroup> queryTaskGroupPagingByProjectCode(Page<TaskGroup> page, @Param("projectCode") Long projectCode);
/**
@ -87,4 +67,12 @@ public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
List<TaskGroup> listAuthorizedResource(@Param("userId") int userId);
List<TaskGroup> selectByProjectCode(@Param("projectCode") long projectCode);
List<TaskGroup> queryAvailableTaskGroups();
List<TaskGroup> queryUsedTaskGroups();
int acquireTaskGroupSlot(@Param("id") Integer id);
int releaseTaskGroupSlot(@Param("id") Integer id);
}

17
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java

@ -104,5 +104,22 @@ public interface TaskGroupQueueMapper extends BaseMapper<TaskGroupQueue> {
void deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
void deleteByWorkflowInstanceIds(@Param("workflowInstanceIds") List<Integer> workflowInstanceIds);
void deleteByTaskGroupIds(@Param("taskGroupIds") List<Integer> taskGroupIds);
void updateTaskGroupPriorityByTaskInstanceId(@Param("taskInstanceId") Integer taskInstanceId,
@Param("priority") int taskGroupPriority);
List<TaskGroupQueue> queryAllInQueueTaskGroupQueueByGroupId(@Param("taskGroupId") Integer taskGroupId,
@Param("inQueue") int inQueue);
List<TaskGroupQueue> queryAllTaskGroupQueueByInQueue(@Param("inQueue") int inQueue);
List<TaskGroupQueue> queryByTaskInstanceId(@Param("taskInstanceId") Integer taskInstanceId);
List<TaskGroupQueue> queryUsingTaskGroupQueueByGroupId(@Param("taskGroupId") Integer taskGroupId,
@Param("status") int status,
@Param("inQueue") int inQueue,
@Param("forceStart") int forceStart);
}

62
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupDao.java

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import java.util.List;
public interface TaskGroupDao extends IDao<TaskGroup> {
/**
* Query all TaskGroups
*
* @return all TaskGroups
*/
List<TaskGroup> queryAllTaskGroups();
/**
* Query all TaskGroups which useSize > 0
*
* @return the TaskGroups which useSize > 0
*/
List<TaskGroup> queryUsedTaskGroups();
/**
* Query all TaskGroups which useSize < groupSize
*
* @return the TaskGroups which useSize < groupSize
*/
List<TaskGroup> queryAvailableTaskGroups();
/**
* Acquire a slot for the TaskGroup which useSize should < groupSize, set the useSize = useSize + 1.
*
* @param taskGroupId taskGroupId which shouldn't be null
* @return true if acquire successfully, false otherwise.
*/
boolean acquireTaskGroupSlot(Integer taskGroupId);
/**
* Release a slot for the TaskGroup which useSize should > 0, set the useSize = useSize - 1.
*
* @param taskGroupId taskGroupId which shouldn't be null
* @return true if release successfully, false otherwise.
*/
boolean releaseTaskGroupSlot(Integer taskGroupId);
}

64
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import java.util.List;
public interface TaskGroupQueueDao extends IDao<TaskGroupQueue> {
/**
* Delete {@link TaskGroupQueue} by {@link ProcessInstance#getId()}
*
* @param workflowInstanceIds workflowInstanceIds
*/
void deleteByWorkflowInstanceIds(List<Integer> workflowInstanceIds);
/**
* Query all {@link TaskGroupQueue} which in_queue is {@link org.apache.dolphinscheduler.common.enums.Flag#YES}
*
* @return TaskGroupQueue ordered by priority desc
*/
List<TaskGroupQueue> queryAllInQueueTaskGroupQueue();
/**
* Query all {@link TaskGroupQueue} which in_queue is {@link org.apache.dolphinscheduler.common.enums.Flag#YES} and taskGroupId is taskGroupId
*
* @param taskGroupId taskGroupId
* @return TaskGroupQueue ordered by priority desc
*/
List<TaskGroupQueue> queryAllInQueueTaskGroupQueueByGroupId(Integer taskGroupId);
/**
* Query all {@link TaskGroupQueue} which taskId is taskInstanceId
*
* @param taskInstanceId taskInstanceId
* @return TaskGroupQueue ordered by priority desc
*/
List<TaskGroupQueue> queryByTaskInstanceId(Integer taskInstanceId);
/**
* Query all {@link TaskGroupQueue} which status is TaskGroupQueueStatus.ACQUIRE_SUCCESS and forceStart is {@link org.apache.dolphinscheduler.common.enums.Flag#NO}.
*
* @param taskGroupId taskGroupId
* @return TaskGroupQueue
*/
List<TaskGroupQueue> queryAcquiredTaskGroupQueueByGroupId(Integer taskGroupId);
}

70
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImpl.java

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.TaskGroupDao;
import java.util.List;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository;
@Slf4j
@Repository
public class TaskGroupDaoImpl extends BaseDao<TaskGroup, TaskGroupMapper> implements TaskGroupDao {
public TaskGroupDaoImpl(@NonNull TaskGroupMapper taskGroupMapper) {
super(taskGroupMapper);
}
@Override
public List<TaskGroup> queryAllTaskGroups() {
return mybatisMapper.selectList(null);
}
@Override
public List<TaskGroup> queryUsedTaskGroups() {
return mybatisMapper.queryUsedTaskGroups();
}
@Override
public List<TaskGroup> queryAvailableTaskGroups() {
return mybatisMapper.queryAvailableTaskGroups();
}
@Override
public boolean acquireTaskGroupSlot(Integer taskGroupId) {
if (taskGroupId == null) {
throw new IllegalArgumentException("taskGroupId cannot be null");
}
return mybatisMapper.acquireTaskGroupSlot(taskGroupId) > 0;
}
@Override
public boolean releaseTaskGroupSlot(Integer taskGroupId) {
if (taskGroupId == null) {
throw new IllegalArgumentException("taskGroupId cannot be null");
}
return mybatisMapper.releaseTaskGroupSlot(taskGroupId) > 0;
}
}

73
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao;
import org.apache.commons.collections4.CollectionUtils;
import java.util.List;
import lombok.NonNull;
import org.springframework.stereotype.Repository;
@Repository
public class TaskGroupQueueDaoImpl extends BaseDao<TaskGroupQueue, TaskGroupQueueMapper> implements TaskGroupQueueDao {
public TaskGroupQueueDaoImpl(@NonNull TaskGroupQueueMapper taskGroupQueueMapper) {
super(taskGroupQueueMapper);
}
@Override
public void deleteByWorkflowInstanceIds(List<Integer> workflowInstanceIds) {
if (CollectionUtils.isEmpty(workflowInstanceIds)) {
return;
}
mybatisMapper.deleteByWorkflowInstanceIds(workflowInstanceIds);
}
@Override
public List<TaskGroupQueue> queryAllInQueueTaskGroupQueue() {
return mybatisMapper.queryAllTaskGroupQueueByInQueue(Flag.YES.getCode());
}
@Override
public List<TaskGroupQueue> queryAllInQueueTaskGroupQueueByGroupId(Integer taskGroupId) {
return mybatisMapper.queryAllInQueueTaskGroupQueueByGroupId(taskGroupId, Flag.YES.getCode());
}
@Override
public List<TaskGroupQueue> queryByTaskInstanceId(Integer taskInstanceId) {
return mybatisMapper.queryByTaskInstanceId(taskInstanceId);
}
@Override
public List<TaskGroupQueue> queryAcquiredTaskGroupQueueByGroupId(Integer taskGroupId) {
return mybatisMapper.queryUsingTaskGroupQueueByGroupId(
taskGroupId,
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode(),
Flag.YES.getCode(),
Flag.NO.getCode());
}
}

58
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml

@ -67,24 +67,6 @@
where project_code = #{projectCode}
</select>
<!--modify data by id-->
<update id="robTaskGroupResource">
update t_ds_task_group
set use_size = use_size + 1
where id = #{id}
and use_size &lt; group_size
and use_size = #{currentUseSize}
and (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus}) = 1
</update>
<!--modify data by id-->
<update id="releaseTaskGroupResource">
update t_ds_task_group
set use_size = use_size-1
where id = #{id} and use_size > 0 and
(select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus} ) = 1
</update>
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroup">
select
<include refid="baseSql" />
@ -93,30 +75,42 @@
user_id = #{userId} and name = #{name}
</select>
<select id="selectAvailableCountById" resultType="java.lang.Integer">
<select id="listAuthorizedResource" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroup">
select
count(1)
<include refid="baseSql" />
from t_ds_task_group
where
id = #{groupId} and use_size &lt; group_size
where 1=1
<if test="userId != 0">
and user_id = #{userId}
</if>
</select>
<select id="selectCountByIdStatus" resultType="java.lang.Integer">
<select id="queryAvailableTaskGroups" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroup">
select
count(1)
<include refid="baseSql"/>
from t_ds_task_group
where
id = #{id} and status = #{status}
where use_size <![CDATA[ < ]]> group_size
</select>
<select id="listAuthorizedResource" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroup">
<select id="queryUsedTaskGroups" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroup">
select
<include refid="baseSql" />
<include refid="baseSql"/>
from t_ds_task_group
where 1=1
<if test="userId != 0">
and user_id = #{userId}
</if>
where use_size <![CDATA[ > ]]> 0
</select>
<update id="acquireTaskGroupSlot">
update t_ds_task_group
set use_size = use_size + 1
where id = #{id}
and use_size &lt; group_size
</update>
<update id="releaseTaskGroupSlot">
update t_ds_task_group
set use_size = use_size - 1
where id = #{id}
and use_size > 0
</update>
</mapper>

61
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml

@ -46,7 +46,7 @@
and group_id = #{groupId}
</if>
</where>
order by update_time desc
order by update_time desc, id desc
</select>
<select id="queryByStatus" resultMap="TaskGroupQueueMap" resultType="map">
@ -126,9 +126,19 @@
<select id="queryTaskGroupQueueByTaskGroupIdPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
select
queue.id, queue.task_name, queue.group_id, queue.process_id, queue.priority, queue.status
, queue.force_start, queue.create_time, queue.update_time,
process.name as processInstanceName,p.name as projectName,p.code as projectCode
queue.id,
queue.task_name,
queue.group_id,
queue.process_id,
queue.priority,
queue.in_queue,
queue.status,
queue.force_start,
queue.create_time,
queue.update_time,
process.name as processInstanceName,
p.name as projectName,
p.code as projectCode
from t_ds_task_group_queue queue
left join t_ds_process_instance process on queue.process_id = process.id
left join t_ds_process_definition p_f on process.process_definition_code = p_f.code
@ -171,6 +181,15 @@
where process_id = #{workflowInstanceId}
</delete>
<delete id="deleteByWorkflowInstanceIds">
delete
from t_ds_task_group_queue
where process_id in
<foreach collection="workflowInstanceIds" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</delete>
<delete id="deleteByTaskGroupIds">
delete
from t_ds_task_group_queue
@ -180,4 +199,38 @@
</foreach>
</delete>
<update id="updateTaskGroupPriorityByTaskInstanceId">
update t_ds_task_group_queue
set priority = #{priority}
where task_id = #{taskInstanceId}
</update>
<select id="queryAllInQueueTaskGroupQueueByGroupId" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
select
<include refid="baseSql" />
from t_ds_task_group_queue
where group_id = #{taskGroupId} and in_queue = #{inQueue} order by priority desc
</select>
<select id="queryAllTaskGroupQueueByInQueue" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
select
<include refid="baseSql" />
from t_ds_task_group_queue
where in_queue = #{inQueue} order by priority desc
</select>
<select id="queryByTaskInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
select
<include refid="baseSql" />
from t_ds_task_group_queue
where task_id = #{taskInstanceId}
</select>
<select id="queryUsingTaskGroupQueueByGroupId" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
select
<include refid="baseSql" />
from t_ds_task_group_queue
where group_id = #{taskGroupId} and status = #{status} and force_start = #{forceStart} and in_queue = #{inQueue}
</select>
</mapper>

1
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -1976,6 +1976,7 @@ CREATE TABLE t_ds_task_group_queue
in_queue int(4) DEFAULT '0' ,
create_time datetime DEFAULT NULL ,
update_time datetime DEFAULT NULL ,
KEY idx_t_ds_task_group_queue_in_queue (in_queue) ,
PRIMARY KEY (id)
);

1
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -1964,6 +1964,7 @@ CREATE TABLE `t_ds_task_group_queue` (
`in_queue` tinyint(4) DEFAULT '0' COMMENT 'ready to get the queue by other task finish 0 NO ,1 YES',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
KEY `idx_t_ds_task_group_queue_in_queue` (`in_queue`),
PRIMARY KEY( `id` )
)ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8 COLLATE = utf8_bin;

2
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -1946,6 +1946,8 @@ CREATE TABLE t_ds_task_group_queue (
PRIMARY KEY (id)
);
create index idx_t_ds_task_group_queue_in_queue on t_ds_task_group_queue(in_queue);
--
-- Table structure for table t_ds_task_group
--

17
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql

@ -109,4 +109,19 @@ ALTER TABLE `t_ds_process_definition_log` MODIFY COLUMN `version` int NOT NULL D
ALTER TABLE `t_ds_process_instance` MODIFY COLUMN `process_definition_version` int NOT NULL DEFAULT 1 COMMENT "process definition version";
ALTER TABLE `t_ds_task_definition` MODIFY COLUMN `version` int NOT NULL DEFAULT 1 COMMENT "task definition version";
ALTER TABLE `t_ds_task_definition_log` MODIFY COLUMN `version` int NOT NULL DEFAULT 1 COMMENT "task definition version";
ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_definition_version` int NOT NULL DEFAULT 1 COMMENT "task definition version";
ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_definition_version` int NOT NULL DEFAULT 1 COMMENT "task definition version";
-- create idx_t_ds_task_group_queue_in_queue on t_ds_task_group_queue
DROP PROCEDURE IF EXISTS create_idx_t_ds_task_group_queue_in_queue;
delimiter d//
CREATE PROCEDURE create_idx_t_ds_task_group_queue_in_queue()
BEGIN
DECLARE index_exists INT DEFAULT 0;
SELECT COUNT(*) INTO index_exists FROM information_schema.statistics WHERE table_schema = (SELECT DATABASE()) AND table_name = 't_ds_task_group_queue' AND index_name = 'idx_t_ds_task_group_queue_in_queue';
IF index_exists = 0 THEN CREATE INDEX idx_t_ds_task_group_queue_in_queue ON t_ds_task_group_queue(in_queue);
END IF;
END;
d//
delimiter ;
CALL create_idx_t_ds_task_group_queue_in_queue;
DROP PROCEDURE create_idx_t_ds_task_group_queue_in_queue;

4
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql

@ -29,4 +29,6 @@ ALTER TABLE "t_ds_process_definition_log" ALTER COLUMN "version" SET DEFAULT 1;
ALTER TABLE "t_ds_task_definition" ALTER COLUMN "version" SET DEFAULT 1;
ALTER TABLE "t_ds_task_definition_log" ALTER COLUMN "version" SET DEFAULT 1;
ALTER TABLE "t_ds_process_instance" ALTER COLUMN "process_definition_version" SET NOT NULL, ALTER COLUMN "process_definition_version" SET DEFAULT 1;
ALTER TABLE "t_ds_task_instance" ALTER COLUMN "task_definition_version" SET NOT NULL, ALTER COLUMN "task_definition_version" SET DEFAULT 1;
ALTER TABLE "t_ds_task_instance" ALTER COLUMN "task_definition_version" SET NOT NULL, ALTER COLUMN "task_definition_version" SET DEFAULT 1;
CREATE INDEX IF NOT EXISTS idx_t_ds_task_group_queue_in_queue ON t_ds_task_group_queue(in_queue);

17
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql

@ -80,4 +80,19 @@ ALTER TABLE `t_ds_process_definition_log` MODIFY COLUMN `version` int NOT NULL D
ALTER TABLE `t_ds_process_instance` MODIFY COLUMN `process_definition_version` int NOT NULL DEFAULT 1 COMMENT "process definition version";
ALTER TABLE `t_ds_task_definition` MODIFY COLUMN `version` int NOT NULL DEFAULT 1 COMMENT "task definition version";
ALTER TABLE `t_ds_task_definition_log` MODIFY COLUMN `version` int NOT NULL DEFAULT 1 COMMENT "task definition version";
ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_definition_version` int NOT NULL DEFAULT 1 COMMENT "task definition version";
ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_definition_version` int NOT NULL DEFAULT 1 COMMENT "task definition version";
-- create idx_t_ds_task_group_queue_in_queue on t_ds_task_group_queue
DROP PROCEDURE IF EXISTS create_idx_t_ds_task_group_queue_in_queue;
delimiter d//
CREATE PROCEDURE create_idx_t_ds_task_group_queue_in_queue()
BEGIN
DECLARE index_exists INT DEFAULT 0;
SELECT COUNT(*) INTO index_exists FROM information_schema.statistics WHERE table_schema = (SELECT DATABASE()) AND table_name = 't_ds_task_group_queue' AND index_name = 'idx_t_ds_task_group_queue_in_queue';
IF index_exists = 0 THEN CREATE INDEX idx_t_ds_task_group_queue_in_queue ON t_ds_task_group_queue(in_queue);
END IF;
END;
d//
delimiter ;
CALL create_idx_t_ds_task_group_queue_in_queue;
DROP PROCEDURE create_idx_t_ds_task_group_queue_in_queue;

4
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql

@ -68,4 +68,6 @@ ALTER TABLE "t_ds_process_definition_log" ALTER COLUMN "version" SET DEFAULT 1;
ALTER TABLE "t_ds_task_definition" ALTER COLUMN "version" SET DEFAULT 1;
ALTER TABLE "t_ds_task_definition_log" ALTER COLUMN "version" SET DEFAULT 1;
ALTER TABLE "t_ds_process_instance" ALTER COLUMN "process_definition_version" SET NOT NULL, ALTER COLUMN "process_definition_version" SET DEFAULT 1;
ALTER TABLE "t_ds_task_instance" ALTER COLUMN "task_definition_version" SET NOT NULL, ALTER COLUMN "task_definition_version" SET DEFAULT 1;
ALTER TABLE "t_ds_task_instance" ALTER COLUMN "task_definition_version" SET NOT NULL, ALTER COLUMN "task_definition_version" SET DEFAULT 1;
CREATE INDEX IF NOT EXISTS idx_t_ds_task_group_queue_in_queue ON t_ds_task_group_queue(in_queue);

9
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
@ -24,8 +25,6 @@ import java.util.Date;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.baomidou.mybatisplus.core.metadata.IPage;
@ -33,8 +32,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
public class TaskGroupMapperTest extends BaseDaoTest {
private static final Logger logger = LoggerFactory.getLogger(TaskGroupMapperTest.class);
@Autowired
TaskGroupMapper taskGroupMapper;
@ -46,7 +43,7 @@ public class TaskGroupMapperTest extends BaseDaoTest {
taskGroup.setName("task group");
taskGroup.setId(1);
taskGroup.setUserId(1);
taskGroup.setStatus(1);
taskGroup.setStatus(Flag.YES);
taskGroup.setGroupSize(10);
taskGroup.setDescription("this is a task group");
Date date = new Date(System.currentTimeMillis());
@ -88,7 +85,7 @@ public class TaskGroupMapperTest extends BaseDaoTest {
Page<TaskGroup> page = new Page(1, 3);
IPage<TaskGroup> taskGroupIPage = taskGroupMapper.queryTaskGroupPaging(
page,
taskGroup.getName(), taskGroup.getStatus());
taskGroup.getName(), taskGroup.getStatus().getCode());
Assertions.assertEquals(taskGroupIPage.getTotal(), 1);
}

118
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImplTest.java

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.repository.TaskGroupDao;
import java.util.Date;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
class TaskGroupDaoImplTest extends BaseDaoTest {
@Autowired
private TaskGroupDao taskGroupDao;
@Test
void queryAllTaskGroups() {
TaskGroup taskGroup = createTaskGroup("test", 0, 1);
taskGroupDao.insert(taskGroup);
List<TaskGroup> taskGroups = taskGroupDao.queryAllTaskGroups();
assertEquals(1, taskGroups.size());
}
@Test
void queryUsedTaskGroups() {
// Insert a unused task group
TaskGroup taskGroup = createTaskGroup("testUnused", 0, 1);
taskGroupDao.insert(taskGroup);
assertEquals(0, taskGroupDao.queryUsedTaskGroups().size());
// Insert a used task group
taskGroup = createTaskGroup("testUsed", 1, 1);
taskGroupDao.insert(taskGroup);
assertEquals(1, taskGroupDao.queryUsedTaskGroups().size());
}
@Test
void queryAvailableTaskGroups() {
// Insert a full task group
TaskGroup taskGroup = createTaskGroup("testFull", 1, 1);
taskGroupDao.insert(taskGroup);
assertEquals(0, taskGroupDao.queryAvailableTaskGroups().size());
// Insert a used task group
taskGroup = createTaskGroup("testNotFull", 0, 1);
taskGroupDao.insert(taskGroup);
assertEquals(1, taskGroupDao.queryAvailableTaskGroups().size());
}
@Test
void acquireTaskGroupSlot() {
// Insert a full task group will acquire failed
TaskGroup taskGroup = createTaskGroup("testFull", 1, 1);
taskGroupDao.insert(taskGroup);
assertFalse(taskGroupDao.acquireTaskGroupSlot(taskGroup.getId()));
taskGroup.setUseSize(0);
taskGroupDao.updateById(taskGroup);
assertTrue(taskGroupDao.acquireTaskGroupSlot(taskGroup.getId()));
taskGroup = taskGroupDao.queryById(taskGroup.getId());
assertEquals(1, taskGroup.getUseSize());
}
@Test
void releaseTaskGroupSlot() {
// Insert an empty task group will release failed
TaskGroup taskGroup = createTaskGroup("testEmpty", 0, 1);
taskGroupDao.insert(taskGroup);
assertFalse(taskGroupDao.releaseTaskGroupSlot(taskGroup.getId()));
taskGroup.setUseSize(1);
taskGroupDao.updateById(taskGroup);
assertTrue(taskGroupDao.releaseTaskGroupSlot(taskGroup.getId()));
taskGroup = taskGroupDao.queryById(taskGroup.getId());
assertEquals(0, taskGroup.getUseSize());
}
private TaskGroup createTaskGroup(String name, int useSize, int groupSize) {
return TaskGroup.builder()
.name(name)
.description("test")
.groupSize(groupSize)
.useSize(useSize)
.userId(1)
.status(Flag.YES)
.createTime(new Date())
.updateTime(new Date())
.projectCode(1)
.build();
}
}

108
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao;
import java.util.Date;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
class TaskGroupQueueDaoImplTest extends BaseDaoTest {
@Autowired
private TaskGroupQueueDao taskGroupQueueDao;
@Test
void deleteByWorkflowInstanceIds() {
TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS);
taskGroupQueueDao.insert(taskGroupQueue);
assertNotNull(taskGroupQueueDao.queryById(taskGroupQueue.getId()));
taskGroupQueueDao.deleteByWorkflowInstanceIds(Lists.newArrayList(1));
assertNull(taskGroupQueueDao.queryById(taskGroupQueue.getId()));
}
@Test
void queryAllInQueueTaskGroupQueue() {
TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS);
taskGroupQueueDao.insert(taskGroupQueue);
assertEquals(1, taskGroupQueueDao.queryAllInQueueTaskGroupQueue().size());
}
@Test
void queryAllInQueueTaskGroupQueueByGroupId() {
TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS);
taskGroupQueueDao.insert(taskGroupQueue);
assertEquals(1, taskGroupQueueDao.queryAllInQueueTaskGroupQueueByGroupId(1).size());
}
@Test
void updateById() {
TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.WAIT_QUEUE);
taskGroupQueueDao.insert(taskGroupQueue);
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
taskGroupQueueDao.updateById(taskGroupQueue);
assertEquals(TaskGroupQueueStatus.ACQUIRE_SUCCESS,
taskGroupQueueDao.queryById(taskGroupQueue.getId()).getStatus());
}
@Test
void queryByTaskInstanceId() {
TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS);
taskGroupQueueDao.insert(taskGroupQueue);
assertEquals(1, taskGroupQueueDao.queryByTaskInstanceId(1).size());
}
@Test
void queryUsingTaskGroupQueueByGroupId() {
TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS);
taskGroupQueueDao.insert(taskGroupQueue);
assertEquals(1, taskGroupQueueDao.queryAcquiredTaskGroupQueueByGroupId(1).size());
taskGroupQueue = createTaskGroupQueue(Flag.YES, TaskGroupQueueStatus.WAIT_QUEUE);
taskGroupQueueDao.insert(taskGroupQueue);
assertEquals(1, taskGroupQueueDao.queryAcquiredTaskGroupQueueByGroupId(1).size());
}
private TaskGroupQueue createTaskGroupQueue(Flag forceStart, TaskGroupQueueStatus taskGroupQueueStatus) {
return TaskGroupQueue.builder()
.taskId(1)
.taskName("test")
.groupId(1)
.processId(1)
.priority(0)
.forceStart(forceStart.getCode())
.inQueue(Flag.YES.getCode())
.status(taskGroupQueueStatus)
.createTime(new Date())
.updateTime(new Date())
.build();
}
}

9
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java

@ -25,9 +25,6 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillReque
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartResponse;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest;
@RpcService
public interface ILogicTaskInstanceOperator {
@ -41,10 +38,4 @@ public interface ILogicTaskInstanceOperator {
@RpcMethod
LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequest);
@RpcMethod
TaskInstanceForceStartResponse forceStartTaskInstance(TaskInstanceForceStartRequest taskForceStartRequest);
@RpcMethod
void wakeupTaskInstance(TaskInstanceWakeupRequest taskWakeupRequest);
}

5
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowInstanceService.java

@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.extract.master;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupResponse;
@RpcService
public interface IWorkflowInstanceService {
@ -30,4 +32,7 @@ public interface IWorkflowInstanceService {
@RpcMethod
WorkflowExecuteDto getWorkflowExecutingData(Integer workflowInstanceId);
@RpcMethod
TaskInstanceWakeupResponse wakeupTaskInstance(TaskInstanceWakeupRequest taskWakeupRequest);
}

42
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartRequest.java

@ -1,42 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.extract.master.transportor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class TaskInstanceForceStartRequest {
private String key;
private int processInstanceId;
private int taskInstanceId;
public TaskInstanceForceStartRequest(
int processInstanceId,
int taskInstanceId) {
this.key = String.format("%d-%d", processInstanceId, taskInstanceId);
this.processInstanceId = processInstanceId;
this.taskInstanceId = taskInstanceId;
}
}

41
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartResponse.java

@ -1,41 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.extract.master.transportor;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TaskInstanceForceStartResponse {
private boolean success;
private String message;
public static TaskInstanceForceStartResponse success() {
return new TaskInstanceForceStartResponse(true, "dispatch success");
}
public static TaskInstanceForceStartResponse failed(String message) {
return new TaskInstanceForceStartResponse(false, message);
}
}

19
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceWakeupRequest.java

@ -17,26 +17,21 @@
package org.apache.dolphinscheduler.extract.master.transportor;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
public class TaskInstanceWakeupRequest {
private String key;
@AllArgsConstructor
public class TaskInstanceWakeupRequest implements Serializable {
private int processInstanceId;
private int taskInstanceId;
public TaskInstanceWakeupRequest(
int processInstanceId,
int taskInstanceId) {
this.key = String.format("%d-%d", processInstanceId, taskInstanceId);
this.processInstanceId = processInstanceId;
this.taskInstanceId = taskInstanceId;
}
}

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import javax.annotation.PostConstruct;
@ -84,6 +85,9 @@ public class MasterServer implements IStoppable {
@Autowired
private MasterSlotManager masterSlotManager;
@Autowired
private TaskGroupCoordinator taskGroupCoordinator;
public static void main(String[] args) {
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
@ -115,6 +119,7 @@ public class MasterServer implements IStoppable {
this.failoverExecuteThread.start();
this.schedulerApi.start();
this.taskGroupCoordinator.start();
MasterServerMetrics.registerMasterCpuUsageGauge(() -> {
SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java

@ -24,14 +24,20 @@ import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class StateEventHandlerManager {
private static final Map<StateEventType, StateEventHandler> stateEventHandlerMap = new HashMap<>();
static {
ServiceLoader.load(StateEventHandler.class)
.forEach(stateEventHandler -> stateEventHandlerMap.put(stateEventHandler.getEventType(),
stateEventHandler));
.forEach(stateEventHandler -> {
log.info("Initialize StateEventHandler: {} for eventType: {}",
stateEventHandler.getClass().getName(), stateEventHandler.getEventType());
stateEventHandlerMap.put(stateEventHandler.getEventType(), stateEventHandler);
});
}
public static Optional<StateEventHandler> getStateEventHandler(StateEventType stateEventType) {

11
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java

@ -62,7 +62,8 @@ public class TaskStateEventHandler implements StateEventHandler {
if (task.getState().isFinished()
&& (taskStateEvent.getStatus() != null && taskStateEvent.getStatus().isRunning())) {
String errorMessage = String.format(
"The current task instance state is %s, but the task state event status is %s, so the task state event will be ignored",
"The current TaskInstance: %s state is %s, but the task state event status is %s, so the task state event will be ignored",
task.getName(),
task.getState().name(),
taskStateEvent.getStatus().name());
log.warn(errorMessage);
@ -75,14 +76,6 @@ public class TaskStateEventHandler implements StateEventHandler {
return true;
}
workflowExecuteRunnable.taskFinished(task);
if (task.getTaskGroupId() > 0) {
log.info("The task instance need to release task Group: {}", task.getTaskGroupId());
try {
workflowExecuteRunnable.releaseTaskGroup(task);
} catch (Exception e) {
throw new StateEventHandleException("Release task group failed", e);
}
}
return true;
}
return true;

47
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import lombok.extern.slf4j.Slf4j;
import com.google.auto.service.AutoService;
@AutoService(StateEventHandler.class)
@Slf4j
public class TaskWaitTaskGroupStateHandler implements StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
StateEvent stateEvent) {
log.info("Handle task instance wait task group event, taskInstanceId: {}", stateEvent.getTaskInstanceId());
if (workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent)) {
log.info("Success wake up task instance, taskInstanceId: {}", stateEvent.getTaskInstanceId());
} else {
log.info("Failed to wake up task instance, taskInstanceId: {}", stateEvent.getTaskInstanceId());
}
return true;
}
@Override
public StateEventType getEventType() {
return StateEventType.WAKE_UP_TASK_GROUP;
}
}

13
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java

@ -32,12 +32,6 @@ public class LogicTaskInstanceOperationFunctionManager {
@Autowired
private LogicITaskInstancePauseOperationFunction logicITaskInstancePauseOperationFunction;
@Autowired
private TaskInstanceForceStartOperationFunction taskInstanceForceStartOperationFunction;
@Autowired
private TaskInstanceWakeupOperationFunction taskInstanceWakeupOperationFunction;
public LogicITaskInstanceDispatchOperationFunction getLogicTaskInstanceDispatchOperationFunction() {
return logicITaskInstanceDispatchOperationFunction;
}
@ -50,11 +44,4 @@ public class LogicTaskInstanceOperationFunctionManager {
return logicITaskInstancePauseOperationFunction;
}
public TaskInstanceForceStartOperationFunction getTaskInstanceForceStartOperationFunction() {
return taskInstanceForceStartOperationFunction;
}
public TaskInstanceWakeupOperationFunction getTaskInstanceWakeupOperationFunction() {
return taskInstanceWakeupOperationFunction;
}
}

14
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java

@ -24,9 +24,6 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillReque
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartResponse;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest;
import lombok.extern.slf4j.Slf4j;
@ -58,15 +55,4 @@ public class LogicTaskInstanceOperatorImpl implements ILogicTaskInstanceOperator
.operate(taskPauseRequest);
}
@Override
public TaskInstanceForceStartResponse forceStartTaskInstance(TaskInstanceForceStartRequest taskForceStartRequest) {
return logicTaskInstanceOperationFunctionManager.getTaskInstanceForceStartOperationFunction()
.operate(taskForceStartRequest);
}
@Override
public void wakeupTaskInstance(TaskInstanceWakeupRequest taskWakeupRequest) {
logicTaskInstanceOperationFunctionManager.getTaskInstanceWakeupOperationFunction().operate(taskWakeupRequest);
}
}

58
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceForceStartOperationFunction.java

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TaskInstanceForceStartOperationFunction
implements
ITaskInstanceOperationFunction<TaskInstanceForceStartRequest, TaskInstanceForceStartResponse> {
@Autowired
private StateEventResponseService stateEventResponseService;
@Override
public TaskInstanceForceStartResponse operate(TaskInstanceForceStartRequest taskInstanceForceStartRequest) {
TaskStateEvent stateEvent = TaskStateEvent.builder()
.processInstanceId(taskInstanceForceStartRequest.getProcessInstanceId())
.taskInstanceId(taskInstanceForceStartRequest.getTaskInstanceId())
.key(taskInstanceForceStartRequest.getKey())
.type(StateEventType.WAKE_UP_TASK_GROUP)
.build();
try {
LogUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
log.info("Received forceStartTaskInstance, event: {}", stateEvent);
stateEventResponseService.addEvent2WorkflowExecute(stateEvent);
return TaskInstanceForceStartResponse.success();
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}

36
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceWakeupOperationFunction.java

@ -17,12 +17,12 @@
package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import lombok.extern.slf4j.Slf4j;
@ -36,20 +36,30 @@ public class TaskInstanceWakeupOperationFunction
ITaskInstanceOperationFunction<TaskInstanceWakeupRequest, TaskInstanceWakeupResponse> {
@Autowired
private StateEventResponseService stateEventResponseService;
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Override
public TaskInstanceWakeupResponse operate(TaskInstanceWakeupRequest taskInstanceWakeupRequest) {
TaskStateEvent stateEvent = TaskStateEvent.builder()
.processInstanceId(taskInstanceWakeupRequest.getProcessInstanceId())
.taskInstanceId(taskInstanceWakeupRequest.getTaskInstanceId())
.key(taskInstanceWakeupRequest.getKey())
.type(StateEventType.WAKE_UP_TASK_GROUP)
.build();
try {
LogUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
log.info("Received wakeupTaskInstance request, event: {}", stateEvent);
stateEventResponseService.addEvent2WorkflowExecute(stateEvent);
log.info("Received TaskInstanceWakeupRequest request{}", taskInstanceWakeupRequest);
int workflowInstanceId = taskInstanceWakeupRequest.getProcessInstanceId();
int taskInstanceId = taskInstanceWakeupRequest.getTaskInstanceId();
LogUtils.setWorkflowAndTaskInstanceIDMDC(workflowInstanceId, taskInstanceId);
WorkflowExecuteRunnable workflowExecuteRunnable =
processInstanceExecCacheManager.getByProcessInstanceId(workflowInstanceId);
if (workflowExecuteRunnable == null) {
log.warn("cannot find WorkflowExecuteRunnable: {}, no need to Wakeup task", workflowInstanceId);
return TaskInstanceWakeupResponse.failed("cannot find WorkflowExecuteRunnable: " + workflowInstanceId);
}
DefaultTaskExecuteRunnable defaultTaskExecuteRunnable =
workflowExecuteRunnable.getTaskExecuteRunnableById(taskInstanceId).orElse(null);
if (defaultTaskExecuteRunnable == null) {
log.warn("Cannot find DefaultTaskExecuteRunnable: {}, cannot Wakeup task", taskInstanceId);
return TaskInstanceWakeupResponse.failed("Cannot find DefaultTaskExecuteRunnable: " + taskInstanceId);
}
defaultTaskExecuteRunnable.dispatch();
log.info("Success Wakeup TaskInstance: {}", taskInstanceId);
return TaskInstanceWakeupResponse.success();
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowInstanceServiceImpl.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService;
import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupResponse;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.service.ExecutingService;
@ -36,6 +38,9 @@ public class WorkflowInstanceServiceImpl implements IWorkflowInstanceService {
@Autowired
private ExecutingService executingService;
@Autowired
private TaskInstanceWakeupOperationFunction taskInstanceWakeupOperationFunction;
@Override
public void clearWorkflowMetrics(Long workflowDefinitionCode) {
log.info("Receive clearWorkflowMetrics request: {}", workflowDefinitionCode);
@ -49,4 +54,9 @@ public class WorkflowInstanceServiceImpl implements IWorkflowInstanceService {
executingService.queryWorkflowExecutingData(workflowInstanceId);
return workflowExecuteDtoOptional.orElse(null);
}
@Override
public TaskInstanceWakeupResponse wakeupTaskInstance(TaskInstanceWakeupRequest taskWakeupRequest) {
return taskInstanceWakeupOperationFunction.operate(taskWakeupRequest);
}
}

147
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
@ -52,14 +51,11 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse;
@ -81,6 +77,7 @@ import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator;
import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
@ -114,7 +111,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.NonNull;
@ -227,6 +223,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
private final ListenerEventAlertManager listenerEventAlertManager;
private final TaskGroupCoordinator taskGroupCoordinator;
public WorkflowExecuteRunnable(
@NonNull IWorkflowExecuteContext workflowExecuteContext,
@NonNull CommandService commandService,
@ -238,7 +236,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
@NonNull CuringParamsService curingParamsService,
@NonNull TaskInstanceDao taskInstanceDao,
@NonNull DefaultTaskExecuteRunnableFactory defaultTaskExecuteRunnableFactory,
@NonNull ListenerEventAlertManager listenerEventAlertManager) {
@NonNull ListenerEventAlertManager listenerEventAlertManager,
@NonNull TaskGroupCoordinator taskGroupCoordinator) {
this.processService = processService;
this.commandService = commandService;
this.processInstanceDao = processInstanceDao;
@ -250,6 +249,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
this.taskInstanceDao = taskInstanceDao;
this.defaultTaskExecuteRunnableFactory = defaultTaskExecuteRunnableFactory;
this.listenerEventAlertManager = listenerEventAlertManager;
this.taskGroupCoordinator = taskGroupCoordinator;
TaskMetrics.registerTaskPrepared(standByTaskInstancePriorityQueue::size);
}
@ -339,43 +339,6 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
return this.stateEvents.size();
}
public boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
TaskGroupQueue taskGroupQueue = processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
log.info("Begin to force start taskGroupQueue: {}", taskGroupQueue.getId());
TaskInstance taskInstance = taskInstanceDao.queryById(stateEvent.getTaskInstanceId());
DefaultTaskExecuteRunnable defaultTaskExecuteRunnable =
taskExecuteRunnableMap.get(taskInstance.getTaskCode());
if (defaultTaskExecuteRunnable != null) {
defaultTaskExecuteRunnable.dispatch();
this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
log.info("Success force start task: {}, taskGroup: {}", taskGroupQueue.getTaskName(),
taskGroupQueue.getGroupId());
} else {
log.warn("Cannot find the TaskExecuteRunnable: {}", taskGroupQueue.getTaskName());
}
return true;
}
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
log.info("Begin to wake up taskGroupQueue: {}", taskGroupQueue.getId());
boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue);
if (acquireTaskGroup) {
TaskInstance taskInstance = taskInstanceDao.queryById(stateEvent.getTaskInstanceId());
taskExecuteRunnableMap.get(taskInstance.getTaskCode()).dispatch();
log.info("Success wake up taskGroupQueue: {}", taskGroupQueue.getId());
return true;
}
log.warn("Failed to wake up taskGroupQueue, taskGroupQueueId: {}", taskGroupQueue.getId());
return false;
} else {
log.info(
"Failed to wake up the taskGroupQueue: {}, since the taskGroupQueue is not in queue, will no need to wake up.",
taskGroupQueue);
return true;
}
}
public void processStart() {
ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance();
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId());
@ -408,6 +371,11 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
taskExecuteRunnableMap.remove(taskInstance.getTaskCode());
stateWheelExecuteThread.removeTask4TimeoutCheck(workflowInstance, taskInstance);
stateWheelExecuteThread.removeTask4RetryCheck(workflowInstance, taskInstance);
if (taskInstance.getTaskGroupId() > 0) {
releaseTaskGroupIfNeeded(taskInstance);
log.info("Release task Group slot: {} for taskInstance: {} ", taskInstance.getTaskGroupId(),
taskInstance.getId());
}
if (taskInstance.getState().isSuccess()) {
completeTaskSet.add(taskInstance.getTaskCode());
@ -462,51 +430,16 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
}
}
/**
* release task group
*
*/
public void releaseTaskGroup(TaskInstance taskInstance) {
ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance();
private void releaseTaskGroupIfNeeded(TaskInstance taskInstance) {
// todo: use Integer
if (taskInstance.getTaskGroupId() <= 0) {
log.info("The current TaskInstance: {} doesn't use taskGroup, no need to release taskGroup",
taskInstance.getName());
return;
}
TaskInstance nextTaskInstance = processService.releaseTaskGroup(taskInstance);
if (nextTaskInstance == null) {
log.info(
"The current TaskInstance: {} is the last taskInstance in the taskGroup, no need to wakeup next taskInstance",
taskInstance.getName());
return;
}
if (nextTaskInstance.getProcessInstanceId() == taskInstance.getProcessInstanceId()) {
TaskStateEvent nextEvent = TaskStateEvent.builder()
.processInstanceId(workflowInstance.getId())
.taskInstanceId(nextTaskInstance.getId())
.type(StateEventType.WAKE_UP_TASK_GROUP)
.build();
stateEvents.add(nextEvent);
} else {
ProcessInstance processInstance =
processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
if (processInstance == null) {
log.error("WorkflowInstance is null cannot wakeup, processInstanceId:{}",
nextTaskInstance.getProcessInstanceId());
return;
}
if (processInstance.getHost() == null || Constants.NULL.equals(processInstance.getHost())) {
log.warn("The next WorkflowInstance: {} host is null no need to wakeup, maybe it is in failover",
processInstance);
return;
}
ILogicTaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstance.getHost(), ILogicTaskInstanceOperator.class);
taskInstanceOperator.wakeupTaskInstance(
new TaskInstanceWakeupRequest(processInstance.getId(), nextTaskInstance.getId()));
}
log.info("Success send wakeup message to next taskInstance: {}", nextTaskInstance.getId());
taskGroupCoordinator.releaseTaskGroupSlot(taskInstance);
log.info("Success release task Group slot: {} for taskInstance: {} ", taskInstance.getTaskGroupId(),
taskInstance.getName());
}
/**
@ -790,10 +723,11 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
} else {
listenerEventAlertManager.publishProcessFailListenerEvent(workflowInstance, projectUser);
}
if (checkTaskQueue()) {
// release task group
processService.releaseAllTaskGroup(workflowInstance.getId());
}
taskInstanceMap.forEach((id, taskInstance) -> {
if (taskInstance != null && taskInstance.getTaskGroupId() > 0) {
releaseTaskGroupIfNeeded(taskInstance);
}
});
// Log the workflowInstance in detail
log.info(WorkflowInstanceUtils.logWorkflowInstanceInDetails(workflowInstance));
}
@ -1006,19 +940,11 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
// it will be wakeup when other tasks release the resource.
int taskGroupId = taskInstance.getTaskGroupId();
if (taskGroupId > 0) {
boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
taskInstance.getName(),
taskGroupId,
taskInstance.getProcessInstanceId(),
taskInstance.getTaskGroupPriority());
if (!acquireTaskGroup) {
log.info(
"Submitted task will not be dispatch right now because the first time to try to acquire"
+
" task group failed, taskInstanceName: {}, taskGroupId: {}",
taskInstance.getName(), taskGroupId);
return true;
}
taskGroupCoordinator.acquireTaskGroupSlot(taskInstance);
log.info("The TaskInstance: {} use taskGroup: {} to manage the resource, will wait to notify it",
taskInstance,
taskGroupId);
return true;
}
// 4. submit to dispatch queue
tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable);
@ -1371,7 +1297,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
// set the task instance state to fault tolerance
existTaskInstance.setFlag(Flag.NO);
existTaskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
releaseTaskGroup(existTaskInstance);
releaseTaskGroupIfNeeded(existTaskInstance);
validTaskMap.remove(existTaskInstance.getTaskCode());
taskInstanceDao.updateById(existTaskInstance);
@ -2090,16 +2016,6 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
return recoveryNodeCodeList;
}
private boolean checkTaskQueue() {
AtomicBoolean result = new AtomicBoolean(false);
taskInstanceMap.forEach((id, taskInstance) -> {
if (taskInstance != null && taskInstance.getTaskGroupId() > 0) {
result.set(true);
}
});
return result.get();
}
private boolean isNewProcessInstance() {
ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance();
if (Flag.YES.equals(workflowInstance.getRecovery())) {
@ -2126,6 +2042,17 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
return taskExecuteRunnableMap;
}
public Optional<DefaultTaskExecuteRunnable> getTaskExecuteRunnableById(Integer taskInstanceId) {
if (taskInstanceId == null) {
throw new IllegalArgumentException("taskInstanceId can't be null");
}
TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
if (taskInstance == null) {
return Optional.empty();
}
return Optional.ofNullable(taskExecuteRunnableMap.get(taskInstance.getTaskCode()));
}
public Map<Long, TaskInstance> getWaitToRetryTaskInstanceMap() {
return waitToRetryTaskInstanceMap;
}

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException;
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator;
import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.command.CommandService;
@ -73,6 +74,9 @@ public class WorkflowExecuteRunnableFactory {
@Autowired
private ListenerEventAlertManager listenerEventAlertManager;
@Autowired
private TaskGroupCoordinator taskGroupCoordinator;
public Optional<WorkflowExecuteRunnable> createWorkflowExecuteRunnable(Command command) throws WorkflowCreateException {
try {
Optional<IWorkflowExecuteContext> workflowExecuteRunnableContextOptional =
@ -88,9 +92,10 @@ public class WorkflowExecuteRunnableFactory {
curingGlobalParamsService,
taskInstanceDao,
defaultTaskExecuteRunnableFactory,
listenerEventAlertManager));
listenerEventAlertManager,
taskGroupCoordinator));
} catch (Exception ex) {
throw new WorkflowCreateException("Create workflow execute runnable failed", ex);
throw new WorkflowCreateException("Create WorkflowExecuteRunnable failed", ex);
}
}

451
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java

@ -0,0 +1,451 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.taskgroup;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskGroupDao;
import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* The TaskGroupCoordinator use to manage the task group slot. The task group slot is used to limit the number of {@link TaskInstance} that can be run at the same time.
* <p>
* The {@link TaskGroupQueue} is used to represent the task group slot. When a {@link TaskGroupQueue} which inQueue is YES means the {@link TaskGroupQueue} is using by a {@link TaskInstance}.
* <p>
* When the {@link TaskInstance} need to use task group, we should use @{@link TaskGroupCoordinator#acquireTaskGroupSlot(TaskInstance)} to acquire the task group slot,
* this method doesn't block should always acquire successfully, and you should directly stop dispatch the task instance.
* When the task group slot is available, the TaskGroupCoordinator will wake up the waiting {@link TaskInstance} to dispatch.
* <pre>
* if(needAcquireTaskGroupSlot(taskInstance)) {
* taskGroupCoordinator.acquireTaskGroupSlot(taskInstance);
* return;
* }
* </pre>
* <p>
* When the {@link TaskInstance} is finished, we should use @{@link TaskGroupCoordinator#releaseTaskGroupSlot(TaskInstance)} to release the task group slot.
* <pre>
* if(needToReleaseTaskGroupSlot(taskInstance)) {
* taskGroupCoordinator.releaseTaskGroupSlot(taskInstance);
* }
* </pre>
*/
@Slf4j
@Component
public class TaskGroupCoordinator extends BaseDaemonThread {
@Autowired
private RegistryClient registryClient;
@Autowired
private TaskGroupDao taskGroupDao;
@Autowired
private TaskGroupQueueDao taskGroupQueueDao;
@Autowired
private TaskInstanceDao taskInstanceDao;
@Autowired
private ProcessInstanceDao processInstanceDao;
public TaskGroupCoordinator() {
super("TaskGroupCoordinator");
}
@Override
public synchronized void start() {
log.info("TaskGroupCoordinator starting...");
super.start();
log.info("TaskGroupCoordinator started...");
}
@Override
public void run() {
while (!ServerLifeCycleManager.isStopped()) {
try {
if (!ServerLifeCycleManager.isRunning()) {
continue;
}
try {
registryClient.getLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
StopWatch taskGroupCoordinatorRoundTimeCost = StopWatch.createStarted();
amendTaskGroupUseSize();
amendTaskGroupQueueStatus();
dealWithForceStartTaskGroupQueue();
dealWithWaitingTaskGroupQueue();
taskGroupCoordinatorRoundTimeCost.stop();
log.info("TaskGroupCoordinator round time cost: {}/ms",
taskGroupCoordinatorRoundTimeCost.getTime());
} finally {
registryClient.releaseLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
}
} catch (Throwable e) {
log.error("TaskGroupCoordinator error", e);
} finally {
// sleep 5s
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5);
}
}
}
/**
* Make sure the TaskGroup useSize is equal to the TaskGroupQueue which status is {@link TaskGroupQueueStatus#ACQUIRE_SUCCESS} and forceStart is {@link org.apache.dolphinscheduler.common.enums.Flag#NO}.
*/
private void amendTaskGroupUseSize() {
// The TaskGroup useSize should equal to the TaskGroupQueue which inQueue is YES and forceStart is NO
List<TaskGroup> taskGroups = taskGroupDao.queryAllTaskGroups();
if (CollectionUtils.isEmpty(taskGroups)) {
return;
}
for (TaskGroup taskGroup : taskGroups) {
List<TaskGroupQueue> taskGroupQueues =
taskGroupQueueDao.queryAcquiredTaskGroupQueueByGroupId(taskGroup.getId());
int actualUseSize = taskGroupQueues.size();
if (taskGroup.getUseSize() == actualUseSize) {
continue;
}
log.warn("The TaskGroup: {} useSize is {}, but the actual use size is {}, will amend it",
taskGroup.getName(),
taskGroup.getUseSize(), actualUseSize);
taskGroup.setUseSize(actualUseSize);
taskGroupDao.updateById(taskGroup);
}
}
/**
* Make sure the TaskGroupQueue status is {@link TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not exist or status is finished.
*/
private void amendTaskGroupQueueStatus() {
List<TaskGroupQueue> taskGroupQueues = taskGroupQueueDao.queryAllInQueueTaskGroupQueue();
List<Integer> taskInstanceIds = taskGroupQueues.stream()
.map(TaskGroupQueue::getTaskId)
.collect(Collectors.toList());
Map<Integer, TaskInstance> taskInstanceMap = taskInstanceDao.queryByIds(taskInstanceIds)
.stream()
.collect(Collectors.toMap(TaskInstance::getId, Function.identity()));
for (TaskGroupQueue taskGroupQueue : taskGroupQueues) {
int taskId = taskGroupQueue.getTaskId();
TaskInstance taskInstance = taskInstanceMap.get(taskId);
if (taskInstance == null) {
log.warn("The TaskInstance: {} is not exist, will release the TaskGroupQueue: {}", taskId,
taskGroupQueue);
releaseTaskGroupQueueSlot(taskGroupQueue);
continue;
}
if (taskInstance.getState().isFinished()) {
log.warn("The TaskInstance: {} state: {} finished, will release the TaskGroupQueue: {}",
taskInstance.getName(), taskInstance.getState(), taskGroupQueue);
releaseTaskGroupQueueSlot(taskGroupQueue);
continue;
}
}
}
private void dealWithForceStartTaskGroupQueue() {
// Find the force start task group queue(Which is inQueue and forceStart is YES)
// Notify the related waiting task instance
// Set the taskGroupQueue status to RELEASE and remove it from queue
List<TaskGroupQueue> taskGroupQueues = taskGroupQueueDao.queryAllInQueueTaskGroupQueue()
.stream()
.filter(taskGroupQueue -> Flag.YES.getCode() == taskGroupQueue.getForceStart())
.collect(Collectors.toList());
for (TaskGroupQueue taskGroupQueue : taskGroupQueues) {
try {
LogUtils.setTaskInstanceIdMDC(taskGroupQueue.getTaskId());
// notify the waiting task instance
// We notify first, it notify failed, the taskGroupQueue will be in queue, and then we will retry it
// next time.
notifyWaitingTaskInstance(taskGroupQueue);
log.info("Notify the ForceStart waiting TaskInstance: {} for taskGroupQueue: {} success",
taskGroupQueue.getTaskName(),
taskGroupQueue.getId());
taskGroupQueue.setInQueue(Flag.NO.getCode());
taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE);
taskGroupQueue.setUpdateTime(new Date());
taskGroupQueueDao.updateById(taskGroupQueue);
log.info("Release the force start TaskGroupQueue {}", taskGroupQueue);
} catch (UnsupportedOperationException unsupportedOperationException) {
releaseTaskGroupQueueSlot(taskGroupQueue);
log.info(
"Notify the ForceStart TaskInstance: {} for taskGroupQueue: {} failed, will release the taskGroupQueue",
taskGroupQueue.getTaskName(), taskGroupQueue.getId(), unsupportedOperationException);
} catch (Throwable throwable) {
log.info("Notify the force start TaskGroupQueue {} failed", taskGroupQueue, throwable);
} finally {
LogUtils.removeTaskInstanceIdMDC();
}
}
}
private void dealWithWaitingTaskGroupQueue() {
// Find the TaskGroup which usage < maxSize.
// Find the highest priority inQueue task group queue(Which is inQueue and status is Waiting and force start is
// NO) belong to the
// task group.
List<TaskGroup> taskGroups = taskGroupDao.queryAvailableTaskGroups();
if (CollectionUtils.isEmpty(taskGroups)) {
log.debug("There is no available task group");
return;
}
for (TaskGroup taskGroup : taskGroups) {
int availableSize = taskGroup.getGroupSize() - taskGroup.getUseSize();
if (availableSize <= 0) {
log.info("TaskGroup {} is full, available size is {}", taskGroup, availableSize);
continue;
}
List<TaskGroupQueue> taskGroupQueues =
taskGroupQueueDao.queryAllInQueueTaskGroupQueueByGroupId(taskGroup.getId())
.stream()
.filter(taskGroupQueue -> Flag.NO.getCode() == taskGroupQueue.getForceStart())
.filter(taskGroupQueue -> TaskGroupQueueStatus.WAIT_QUEUE == taskGroupQueue.getStatus())
.limit(availableSize)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(taskGroupQueues)) {
log.debug("There is no waiting task group queue for task group {}", taskGroup.getName());
continue;
}
for (TaskGroupQueue taskGroupQueue : taskGroupQueues) {
try {
LogUtils.setTaskInstanceIdMDC(taskGroupQueue.getTaskId());
// Reduce the taskGroupSize
boolean acquireResult = taskGroupDao.acquireTaskGroupSlot(taskGroup.getId());
if (!acquireResult) {
log.error("Failed to acquire task group slot for task group {}", taskGroup);
continue;
}
// Notify the waiting task instance
// We notify first, it notify failed, the taskGroupQueue will be in queue, and then we will retry it
// next time.
notifyWaitingTaskInstance(taskGroupQueue);
// Set the taskGroupQueue status to RUNNING and remove from queue
taskGroupQueue.setInQueue(Flag.YES.getCode());
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
taskGroupQueue.setUpdateTime(new Date());
taskGroupQueueDao.updateById(taskGroupQueue);
} catch (UnsupportedOperationException unsupportedOperationException) {
releaseTaskGroupQueueSlot(taskGroupQueue);
log.info(
"Notify the Waiting TaskInstance: {} for taskGroupQueue: {} failed, will release the taskGroupQueue",
taskGroupQueue.getTaskName(), taskGroupQueue.getId(), unsupportedOperationException);
} catch (Throwable throwable) {
log.error("Notify Waiting TaskGroupQueue: {} failed", taskGroupQueue, throwable);
} finally {
LogUtils.removeTaskInstanceIdMDC();
}
}
}
}
/**
* If the {@link TaskInstance#getTaskGroupId()} > 0, and the TaskGroup flag is {@link Flag#YES} then the task instance need to use task group.
*
* @param taskInstance task instance
* @return true if the TaskInstance need to acquireTaskGroupSlot
*/
public boolean needAcquireTaskGroupSlot(TaskInstance taskInstance) {
if (taskInstance == null) {
throw new IllegalArgumentException("The TaskInstance is null");
}
if (taskInstance.getTaskGroupId() <= 0) {
log.debug("The current TaskInstance doesn't use TaskGroup, no need to acquire TaskGroupSlot");
return false;
}
TaskGroup taskGroup = taskGroupDao.queryById(taskInstance.getTaskGroupId());
if (taskGroup == null) {
log.warn("The current TaskGroup: {} does not exist, will not acquire TaskGroupSlot",
taskInstance.getTaskGroupId());
return false;
}
return Flag.YES.equals(taskGroup.getStatus());
}
/**
* Acquire the task group slot for the given {@link TaskInstance}.
* <p>
* When taskInstance want to acquire a TaskGroup slot, should call this method. If acquire successfully, will create a TaskGroupQueue in db which is in queue and status is {@link TaskGroupQueueStatus#WAIT_QUEUE}.
* The TaskInstance shouldn't dispatch until there exist available slot, the taskGroupCoordinator notify it.
*
* @param taskInstance the task instance which want to acquire task group slot.
* @throws IllegalArgumentException if the taskInstance is null or the used taskGroup doesn't exist.
*/
public void acquireTaskGroupSlot(TaskInstance taskInstance) {
if (taskInstance == null || taskInstance.getTaskGroupId() <= 0) {
throw new IllegalArgumentException("The current TaskInstance does not use task group");
}
TaskGroup taskGroup = taskGroupDao.queryById(taskInstance.getTaskGroupId());
if (taskGroup == null) {
throw new IllegalArgumentException(
"The current TaskGroup: " + taskInstance.getTaskGroupId() + " does not exist");
}
// Write TaskGroupQueue in db, and then return wait TaskGroupCoordinator to notify it
// Set the taskGroupQueue status to WAIT_QUEUE and add to queue
// The queue only contains the taskGroupQueue which status is WAIT_QUEUE or ACQUIRE_SUCCESS
Date now = new Date();
TaskGroupQueue taskGroupQueue = TaskGroupQueue
.builder()
.taskId(taskInstance.getId())
.taskName(taskInstance.getName())
.groupId(taskInstance.getTaskGroupId())
.processId(taskInstance.getProcessInstanceId())
.priority(taskInstance.getTaskGroupPriority())
.inQueue(Flag.YES.getCode())
.forceStart(Flag.NO.getCode())
.status(TaskGroupQueueStatus.WAIT_QUEUE)
.createTime(now)
.updateTime(now)
.build();
log.info("Success insert TaskGroupQueue: {} for TaskInstance: {}", taskGroupQueue, taskInstance.getName());
taskGroupQueueDao.insert(taskGroupQueue);
}
/**
* If the TaskInstance is using TaskGroup then it need to release TaskGroupSlot.
*
* @param taskInstance taskInsatnce
* @return true if the TaskInstance need to release TaskGroupSlot
*/
public boolean needToReleaseTaskGroupSlot(TaskInstance taskInstance) {
if (taskInstance == null) {
throw new IllegalArgumentException("The TaskInstance is null");
}
if (taskInstance.getTaskGroupId() <= 0) {
log.debug("The current TaskInstance doesn't use TaskGroup, no need to release TaskGroupSlot");
return false;
}
return true;
}
/**
* Release the task group slot for the given {@link TaskInstance}.
* <p>
* When taskInstance want to release a TaskGroup slot, should call this method. The release method will move the TaskGroupQueue out queue and set status to {@link TaskGroupQueueStatus#RELEASE}.
* This method is idempotent, this means that if the task group slot is already released, this method will do nothing.
*
* @param taskInstance the task instance which want to release task group slot.
* @throws IllegalArgumentException If the taskInstance is null or the task doesn't use task group.
*/
public void releaseTaskGroupSlot(TaskInstance taskInstance) {
if (taskInstance == null || taskInstance.getTaskGroupId() <= 0) {
throw new IllegalArgumentException("The current TaskInstance does not use task group");
}
List<TaskGroupQueue> taskGroupQueues = taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId());
for (TaskGroupQueue taskGroupQueue : taskGroupQueues) {
releaseTaskGroupQueueSlot(taskGroupQueue);
}
}
private void notifyWaitingTaskInstance(TaskGroupQueue taskGroupQueue) {
// Find the related waiting task instance
// send RPC to notify the waiting task instance
TaskInstance taskInstance = taskInstanceDao.queryById(taskGroupQueue.getTaskId());
if (taskInstance == null) {
throw new UnsupportedOperationException(
"The TaskInstance: " + taskGroupQueue.getTaskId() + " is not exist, no need to notify");
}
// todo: We may need to add a new status to represent the task instance is waiting for task group slot
if (taskInstance.getState() != TaskExecutionStatus.SUBMITTED_SUCCESS) {
throw new UnsupportedOperationException(
"The TaskInstance: " + taskInstance.getId() + " state is " + taskInstance.getState()
+ ", no need to notify");
}
ProcessInstance processInstance = processInstanceDao.queryById(taskInstance.getProcessInstanceId());
if (processInstance == null) {
throw new UnsupportedOperationException(
"The WorkflowInstance: " + taskInstance.getProcessInstanceId()
+ " is not exist, no need to notify");
}
if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) {
throw new UnsupportedOperationException(
"The WorkflowInstance: " + processInstance.getId() + " state is " + processInstance.getState()
+ ", no need to notify");
}
if (processInstance.getHost() == null || Constants.NULL.equals(processInstance.getHost())) {
throw new UnsupportedOperationException(
"WorkflowInstance host is null, maybe it is in failover: " + processInstance);
}
TaskInstanceWakeupRequest taskInstanceWakeupRequest = TaskInstanceWakeupRequest.builder()
.processInstanceId(processInstance.getId())
.taskInstanceId(taskInstance.getId())
.build();
IWorkflowInstanceService iWorkflowInstanceService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstance.getHost(), IWorkflowInstanceService.class);
TaskInstanceWakeupResponse taskInstanceWakeupResponse =
iWorkflowInstanceService.wakeupTaskInstance(taskInstanceWakeupRequest);
if (!taskInstanceWakeupResponse.isSuccess()) {
throw new UnsupportedOperationException(
"Notify TaskInstance: " + taskInstance.getId() + " failed: " + taskInstanceWakeupResponse);
}
log.info("Wake up TaskInstance: {} success", taskInstance.getName());
}
private void releaseTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) {
if (TaskGroupQueueStatus.RELEASE.equals(taskGroupQueue.getStatus())
&& Flag.NO.getCode() == taskGroupQueue.getInQueue()) {
log.info("The TaskGroupQueue: {} is already released", taskGroupQueue);
return;
}
taskGroupQueue.setInQueue(Flag.NO.getCode());
taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE);
taskGroupQueue.setUpdateTime(new Date());
taskGroupQueueDao.updateById(taskGroupQueue);
log.info("Success release TaskGroupQueue: {}", taskGroupQueue);
}
}

11
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java

@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator;
import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -49,7 +50,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
@ -104,6 +104,8 @@ public class WorkflowExecuteRunnableTest {
private ListenerEventAlertManager listenerEventAlertManager;
private TaskGroupCoordinator taskGroupCoordinator;
@BeforeEach
public void init() throws Exception {
applicationContext = Mockito.mock(ApplicationContext.class);
@ -138,6 +140,8 @@ public class WorkflowExecuteRunnableTest {
Mockito.when(workflowExecuteContext.getWorkflowGraph()).thenReturn(workflowGraph);
Mockito.when(workflowGraph.getDag()).thenReturn(new DAG<>());
taskGroupCoordinator = Mockito.mock(TaskGroupCoordinator.class);
workflowExecuteThread = Mockito.spy(
new WorkflowExecuteRunnable(
workflowExecuteContext,
@ -150,11 +154,12 @@ public class WorkflowExecuteRunnableTest {
curingGlobalParamsService,
taskInstanceDao,
defaultTaskExecuteRunnableFactory,
listenerEventAlertManager));
listenerEventAlertManager,
taskGroupCoordinator));
}
@Test
public void testParseStartNodeName() throws ParseException {
public void testParseStartNodeName() {
try {
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_START_NODES, "1,2,3");

182
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.taskgroup;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskGroupDao;
import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import com.google.common.collect.Lists;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class TaskGroupCoordinatorTest {
@InjectMocks
private TaskGroupCoordinator taskGroupCoordinator;
@Mock
private RegistryClient registryClient;
@Mock
private TaskGroupDao taskGroupDao;
@Mock
private TaskGroupQueueDao taskGroupQueueDao;
@Mock
private TaskInstanceDao taskInstanceDao;
@Mock
private ProcessInstanceDao processInstanceDao;
@Test
void start() throws InterruptedException {
// Get the Lock from Registry
taskGroupCoordinator.start();
Thread.sleep(1_000);
verify(registryClient, Mockito.times(1))
.getLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
verify(registryClient, Mockito.times(1))
.releaseLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
}
@Test
void needAcquireTaskGroupSlot() {
// TaskInstance is null
IllegalArgumentException illegalArgumentException =
assertThrows(IllegalArgumentException.class, () -> taskGroupCoordinator.needAcquireTaskGroupSlot(null));
assertEquals("The TaskInstance is null", illegalArgumentException.getMessage());
// TaskGroupId < 0
TaskInstance taskInstance = new TaskInstance();
assertFalse(taskGroupCoordinator.needAcquireTaskGroupSlot(taskInstance));
// TaskGroup not exist
taskInstance.setTaskGroupId(1);
when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(null);
assertFalse(taskGroupCoordinator.needAcquireTaskGroupSlot(taskInstance));
// TaskGroup is closed
TaskGroup taskGroup = new TaskGroup();
taskGroup.setStatus(Flag.NO);
when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(taskGroup);
assertFalse(taskGroupCoordinator.needAcquireTaskGroupSlot(taskInstance));
// TaskGroup is open
taskGroup.setStatus(Flag.YES);
when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(taskGroup);
assertTrue(taskGroupCoordinator.needToReleaseTaskGroupSlot(taskInstance));
}
@Test
void acquireTaskGroupSlot() {
// TaskInstance is NULL
IllegalArgumentException illegalArgumentException =
assertThrows(IllegalArgumentException.class, () -> taskGroupCoordinator.acquireTaskGroupSlot(null));
assertEquals("The current TaskInstance does not use task group", illegalArgumentException.getMessage());
// TaskGroupId is NULL
TaskInstance taskInstance = new TaskInstance();
illegalArgumentException = assertThrows(IllegalArgumentException.class,
() -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
assertEquals("The current TaskInstance does not use task group", illegalArgumentException.getMessage());
// TaskGroup not exist
taskInstance.setTaskGroupId(1);
taskInstance.setId(1);
when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(null);
illegalArgumentException = assertThrows(IllegalArgumentException.class,
() -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
assertEquals("The current TaskGroup: 1 does not exist", illegalArgumentException.getMessage());
// TaskGroup exist
when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(new TaskGroup());
Assertions.assertDoesNotThrow(() -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
}
@Test
void needToReleaseTaskGroupSlot() {
IllegalArgumentException illegalArgumentException = assertThrows(IllegalArgumentException.class,
() -> taskGroupCoordinator.needToReleaseTaskGroupSlot(null));
assertEquals("The TaskInstance is null", illegalArgumentException.getMessage());
TaskInstance taskInstance = new TaskInstance();
assertFalse(taskGroupCoordinator.needToReleaseTaskGroupSlot(taskInstance));
taskInstance.setTaskGroupId(1);
assertTrue(taskGroupCoordinator.needToReleaseTaskGroupSlot(taskInstance));
}
@Test
void releaseTaskGroupSlot() {
// TaskInstance is NULL
IllegalArgumentException illegalArgumentException =
assertThrows(IllegalArgumentException.class, () -> taskGroupCoordinator.releaseTaskGroupSlot(null));
assertEquals("The current TaskInstance does not use task group", illegalArgumentException.getMessage());
// TaskGroupId is NULL
TaskInstance taskInstance = new TaskInstance();
illegalArgumentException = assertThrows(IllegalArgumentException.class,
() -> taskGroupCoordinator.releaseTaskGroupSlot(taskInstance));
assertEquals("The current TaskInstance does not use task group", illegalArgumentException.getMessage());
// Release TaskGroupQueue
taskInstance.setId(1);
taskInstance.setTaskGroupId(1);
TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
List<TaskGroupQueue> taskGroupQueues = Lists.newArrayList(taskGroupQueue);
when(taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId())).thenReturn(taskGroupQueues);
taskGroupCoordinator.releaseTaskGroupSlot(taskInstance);
assertEquals(Flag.NO.getCode(), taskGroupQueue.getInQueue());
assertEquals(TaskGroupQueueStatus.RELEASE, taskGroupQueue.getStatus());
verify(taskGroupQueueDao, Mockito.times(1)).updateById(taskGroupQueue);
}
}

1
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java

@ -28,6 +28,7 @@ public enum RegistryNodeType {
MASTER("Master", "/nodes/master"),
MASTER_NODE_LOCK("MasterNodeLock", "/lock/master-node"),
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", "/lock/master-task-group-coordinator"),
WORKER("Worker", "/nodes/worker"),
ALERT_SERVER("AlertServer", "/nodes/alert-server"),
ALERT_LOCK("AlertNodeLock", "/lock/alert"),

33
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@ -51,7 +50,6 @@ import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.model.TaskNode;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
@ -72,8 +70,6 @@ public interface ProcessService {
ProcessInstance findProcessInstanceById(int processId);
ProcessDefinition findProcessDefineById(int processDefinitionId);
ProcessDefinition findProcessDefinition(Long processDefinitionCode, int processDefinitionVersion);
ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode);
@ -92,9 +88,6 @@ public interface ProcessService {
void setSubProcessParam(ProcessInstance subProcessInstance);
boolean submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes,
long commitInterval);
@Transactional
boolean submitTask(ProcessInstance processInstance, TaskInstance taskInstance);
@ -129,18 +122,10 @@ public interface ProcessService {
DataSource findDataSourceById(int id);
ProcessInstance findProcessInstanceByTaskId(int taskId);
List<UdfFunc> queryUdfFunListByIds(Integer[] ids);
List<Schedule> selectAllByProcessDefineCode(long[] codes);
String queryUserQueueByProcessInstance(ProcessInstance processInstance);
ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId);
List<Project> getProjectListHavePerm(int userId);
<T> List<T> listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType);
User getUserById(int userId);
@ -175,8 +160,6 @@ public interface ProcessService {
List<TaskNode> transformTask(List<ProcessTaskRelation> taskRelationList,
List<TaskDefinitionLog> taskDefinitionLogs);
Map<ProcessInstance, TaskInstance> notifyProcessList(int processId);
DqExecuteResult getDqExecuteResultByTaskInstanceId(int taskInstanceId);
int updateDqExecuteResultUserId(int taskInstanceId);
@ -195,16 +178,6 @@ public interface ProcessService {
DqComparisonType getComparisonTypeById(int id);
boolean acquireTaskGroup(int taskId,
String taskName, int groupId,
int processId, int priority);
boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue);
void releaseAllTaskGroup(int processInstanceId);
TaskInstance releaseTaskGroup(TaskInstance taskInstance);
void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status);
TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
@ -214,12 +187,6 @@ public interface ProcessService {
Integer priority,
TaskGroupQueueStatus status);
int updateTaskGroupQueueStatus(Integer taskId, int status);
int updateTaskGroupQueue(TaskGroupQueue taskGroupQueue);
TaskGroupQueue loadTaskGroupQueue(int taskId);
ProcessInstance loadNextProcess4Serial(long code, int state, int id);
public String findConfigYamlByName(String clusterName);

315
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -67,12 +67,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
@ -410,17 +408,6 @@ public class ProcessServiceImpl implements ProcessService {
return processInstanceMapper.selectById(processId);
}
/**
* find process define by id.
*
* @param processDefinitionId processDefinitionId
* @return process definition
*/
@Override
public ProcessDefinition findProcessDefineById(int processDefinitionId) {
return processDefineMapper.selectById(processDefinitionId);
}
/**
* find process define by code and version.
*
@ -1080,34 +1067,6 @@ public class ProcessServiceImpl implements ProcessService {
taskInstanceDao.updateById(taskInstance);
}
/**
* retry submit task to db
*/
@Override
public boolean submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance,
int commitRetryTimes, long commitInterval) {
int retryTimes = 1;
while (retryTimes <= commitRetryTimes) {
try {
// submit task to db
// Only want to use transaction here
if (submitTask(processInstance, taskInstance)) {
return true;
}
log.error(
"task commit to db failed , taskCode: {} has already retry {} times, please check the database",
taskInstance.getTaskCode(),
retryTimes);
Thread.sleep(commitInterval);
} catch (Exception e) {
log.error("task commit to db failed", e);
} finally {
retryTimes += 1;
}
}
return false;
}
/**
* // todo: This method need to refactor, we find when the db down, but the taskInstanceId is not 0. It's better to change to void, rather than return TaskInstance
* submit task to db
@ -1260,20 +1219,6 @@ public class ProcessServiceImpl implements ProcessService {
}
}
/**
* get sub work flow command type
* child instance exist: child command = fatherCommand
* child instance not exists: child command = fatherCommand[0]
*/
private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) {
CommandType commandType = parentProcessInstance.getCommandType();
if (childInstance == null) {
String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
}
return commandType;
}
/**
* update sub process definition
*
@ -1573,21 +1518,6 @@ public class ProcessServiceImpl implements ProcessService {
return dataSourceMapper.selectById(id);
}
/**
* find process instance by the task id
*
* @param taskId taskId
* @return process instance
*/
@Override
public ProcessInstance findProcessInstanceByTaskId(int taskId) {
TaskInstance taskInstance = taskInstanceMapper.selectById(taskId);
if (taskInstance != null) {
return processInstanceMapper.selectById(taskInstance.getProcessInstanceId());
}
return null;
}
/**
* find udf function list by id list string
*
@ -1599,37 +1529,6 @@ public class ProcessServiceImpl implements ProcessService {
return udfFuncMapper.queryUdfByIdStr(ids, null);
}
/**
* find schedule list by process define codes.
*
* @param codes codes
* @return schedule list
*/
@Override
public List<Schedule> selectAllByProcessDefineCode(long[] codes) {
return scheduleMapper.selectAllByProcessDefineArray(codes);
}
/**
* query user queue by process instance
*
* @param processInstance processInstance
* @return queue
*/
@Override
public String queryUserQueueByProcessInstance(ProcessInstance processInstance) {
String queue = "";
if (processInstance == null) {
return queue;
}
User executor = userMapper.selectById(processInstance.getExecutorId());
if (executor != null) {
queue = executor.getQueue();
}
return queue;
}
/**
* query project name and user name by processInstanceId.
*
@ -1641,27 +1540,6 @@ public class ProcessServiceImpl implements ProcessService {
return projectMapper.queryProjectWithUserByProcessInstanceId(processInstanceId);
}
/**
* get have perm project list
*
* @param userId userId
* @return project list
*/
@Override
public List<Project> getProjectListHavePerm(int userId) {
List<Project> createProjects = projectMapper.queryProjectCreatedByUser(userId);
List<Project> authedProjects = projectMapper.queryAuthedProjectListByUserId(userId);
if (createProjects == null) {
createProjects = new ArrayList<>();
}
if (authedProjects != null) {
createProjects.addAll(authedProjects);
}
return createProjects;
}
/**
* list unauthorized udf function
*
@ -2108,23 +1986,6 @@ public class ProcessServiceImpl implements ProcessService {
return taskNodeList;
}
@Override
public Map<ProcessInstance, TaskInstance> notifyProcessList(int processId) {
HashMap<ProcessInstance, TaskInstance> processTaskMap = new HashMap<>();
// find sub tasks
ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryBySubProcessId(processId);
if (processInstanceMap == null) {
return processTaskMap;
}
ProcessInstance fatherProcess = this.findProcessInstanceById(processInstanceMap.getParentProcessInstanceId());
TaskInstance fatherTask = taskInstanceDao.queryById(processInstanceMap.getParentTaskInstanceId());
if (fatherProcess != null) {
processTaskMap.put(fatherProcess, fatherTask);
}
return processTaskMap;
}
@Override
public DqExecuteResult getDqExecuteResultByTaskInstanceId(int taskInstanceId) {
return dqExecuteResultMapper.getExecuteResultById(taskInstanceId);
@ -2195,167 +2056,6 @@ public class ProcessServiceImpl implements ProcessService {
return dqComparisonTypeMapper.selectById(id);
}
/**
* the first time (when submit the task ) get the resource of the task group
*/
@Override
public boolean acquireTaskGroup(int taskInstanceId,
String taskName,
int taskGroupId,
int workflowInstanceId,
int taskGroupPriority) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupId);
if (taskGroup == null) {
// we don't throw exception here, to avoid the task group has been deleted during workflow running
log.warn("The taskGroup is not exist no need to acquire taskGroup, taskGroupId: {}", taskGroupId);
return true;
}
// if task group is not applicable
if (taskGroup.getStatus() == Flag.NO.getCode()) {
log.warn("The taskGroup status is {}, no need to acquire taskGroup, taskGroupId: {}", taskGroup.getStatus(),
taskGroupId);
return true;
}
// Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS
TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskInstanceId);
if (taskGroupQueue == null) {
taskGroupQueue = insertIntoTaskGroupQueue(
taskInstanceId,
taskName,
taskGroupId,
workflowInstanceId,
taskGroupPriority,
TaskGroupQueueStatus.WAIT_QUEUE);
log.info("Insert TaskGroupQueue: {} successfully", taskGroupQueue.getId());
} else {
log.info("The task queue is already exist, taskId: {}", taskInstanceId);
if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
return true;
}
}
// check if there already exist higher priority tasks
List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(
taskGroupId,
taskGroupPriority,
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
return false;
}
// try to get taskGroup
int availableTaskGroupCount = taskGroupMapper.selectAvailableCountById(taskGroupId);
if (availableTaskGroupCount < 1) {
log.info(
"Failed to acquire taskGroup, there is no avaliable taskGroup, taskInstanceId: {}, taskGroupId: {}",
taskInstanceId, taskGroupId);
taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return false;
}
return robTaskGroupResource(taskGroupQueue);
}
/**
* try to get the task group resource(when other task release the resource)
*/
@Override
public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
// set the default max size to avoid dead loop
for (int i = 0; i < 10; i++) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
if (taskGroup.getGroupSize() <= taskGroup.getUseSize()) {
// remove
taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
log.info("The current task Group is full, taskGroup: {}", taskGroup);
return false;
}
int affectedCount = taskGroupMapper.robTaskGroupResource(
taskGroup.getId(),
taskGroup.getUseSize(),
taskGroupQueue.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (affectedCount > 0) {
log.info("Success rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(),
taskGroupQueue.getId());
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
this.taskGroupQueueMapper.updateById(taskGroupQueue);
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return true;
}
}
log.info("Failed to rob taskGroup, taskGroupQueue: {}", taskGroupQueue);
taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return false;
}
@Override
public void releaseAllTaskGroup(int processInstanceId) {
List<TaskInstance> taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId,
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
for (TaskInstance info : taskInstances) {
releaseTaskGroup(info);
}
}
/**
* release the TGQ resource when the corresponding task is finished.
*
* @return the result code and msg
*/
@Override
public TaskInstance releaseTaskGroup(TaskInstance taskInstance) {
TaskGroup taskGroup;
TaskGroupQueue thisTaskGroupQueue;
log.info("Begin to release task group: {}", taskInstance.getTaskGroupId());
try {
do {
taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
if (taskGroup == null) {
log.error("The taskGroup is not exist no need to release taskGroup, taskGroupId: {}",
taskInstance.getTaskGroupId());
return null;
}
thisTaskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
log.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId());
return null;
}
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.WAIT_QUEUE) {
log.info("The taskGroupQueue's status is in waiting, will not need to release task group");
break;
}
} while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode()
&& taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
taskGroup.getUseSize(),
thisTaskGroupQueue.getId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1);
} catch (Exception e) {
log.error("release the task group error", e);
return null;
}
log.info("Finished to release task group, taskGroupId: {}", taskInstance.getTaskGroupId());
log.info("Begin to release task group queue, taskGroupId: {}", taskInstance.getTaskGroupId());
changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE);
TaskGroupQueue taskGroupQueue;
do {
taskGroupQueue = taskGroupQueueMapper.queryTheHighestPriorityTasks(
taskGroup.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode(),
Flag.NO.getCode(),
Flag.NO.getCode());
if (taskGroupQueue == null) {
log.info("There is no taskGroupQueue need to be wakeup taskGroup: {}", taskGroup.getId());
return null;
}
} while (this.taskGroupQueueMapper.updateInQueueCAS(
Flag.NO.getCode(),
Flag.YES.getCode(),
taskGroupQueue.getId()) != 1);
log.info("Finished to release task group queue: taskGroupId: {}, taskGroupQueueId: {}",
taskInstance.getTaskGroupId(), taskGroupQueue.getId());
return taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
}
/**
* release the TGQ resource when the corresponding task is finished.
*
@ -2396,21 +2096,6 @@ public class ProcessServiceImpl implements ProcessService {
return taskGroupQueue;
}
@Override
public int updateTaskGroupQueueStatus(Integer taskId, int status) {
return taskGroupQueueMapper.updateStatusByTaskId(taskId, status);
}
@Override
public int updateTaskGroupQueue(TaskGroupQueue taskGroupQueue) {
return taskGroupQueueMapper.updateById(taskGroupQueue);
}
@Override
public TaskGroupQueue loadTaskGroupQueue(int taskId) {
return this.taskGroupQueueMapper.queryByTaskId(taskId);
}
@Override
public ProcessInstance loadNextProcess4Serial(long code, int state, int id) {
return this.processInstanceMapper.loadNextProcess4Serial(code, state, id);

16
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -749,22 +749,6 @@ public class ProcessServiceTest {
Assertions.assertNotNull(taskGroupQueue);
}
@Test
public void testDoRelease() {
TaskGroupQueue taskGroupQueue = getTaskGroupQueue();
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setTaskGroupId(taskGroupQueue.getGroupId());
when(taskGroupQueueMapper.queryByTaskId(1)).thenReturn(taskGroupQueue);
when(taskGroupQueueMapper.updateById(taskGroupQueue)).thenReturn(1);
processService.releaseTaskGroup(taskInstance);
}
private TaskGroupQueue getTaskGroupQueue() {
TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
taskGroupQueue.setTaskName("task name");

Loading…
Cancel
Save