diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java index bd7af94611..aecc32287b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java @@ -166,7 +166,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread { } /** - * Make sure the TaskGroupQueue status is {@link TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not exist or status is finished. + * Clear the TaskGroupQueue when the related {@link TaskInstance} is not exist or status is finished. */ private void amendTaskGroupQueueStatus() { int minTaskGroupQueueId = -1; @@ -188,7 +188,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread { } /** - * Make sure the TaskGroupQueue status is {@link TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not exist or status is finished. + * Clear the TaskGroupQueue when the related {@link TaskInstance} is not exist or status is finished. */ private void amendTaskGroupQueueStatus(List taskGroupQueues) { List taskInstanceIds = taskGroupQueues.stream() @@ -205,14 +205,14 @@ public class TaskGroupCoordinator extends BaseDaemonThread { if (taskInstance == null) { log.warn("The TaskInstance: {} is not exist, will release the TaskGroupQueue: {}", taskId, taskGroupQueue); - releaseTaskGroupQueueSlot(taskGroupQueue); + deleteTaskGroupQueueSlot(taskGroupQueue); continue; } if (taskInstance.getState().isFinished()) { log.warn("The TaskInstance: {} state: {} finished, will release the TaskGroupQueue: {}", taskInstance.getName(), taskInstance.getState(), taskGroupQueue); - releaseTaskGroupQueueSlot(taskGroupQueue); + deleteTaskGroupQueueSlot(taskGroupQueue); continue; } } @@ -257,13 +257,10 @@ public class TaskGroupCoordinator extends BaseDaemonThread { taskGroupQueue.getTaskName(), taskGroupQueue.getId()); - taskGroupQueue.setInQueue(Flag.NO.getCode()); - taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE); - taskGroupQueue.setUpdateTime(new Date()); - taskGroupQueueDao.updateById(taskGroupQueue); + deleteTaskGroupQueueSlot(taskGroupQueue); log.info("Release the force start TaskGroupQueue {}", taskGroupQueue); } catch (UnsupportedOperationException unsupportedOperationException) { - releaseTaskGroupQueueSlot(taskGroupQueue); + deleteTaskGroupQueueSlot(taskGroupQueue); log.info( "Notify the ForceStart TaskInstance: {} for taskGroupQueue: {} failed, will release the taskGroupQueue", taskGroupQueue.getTaskName(), taskGroupQueue.getId(), unsupportedOperationException); @@ -322,7 +319,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread { taskGroupQueue.setUpdateTime(new Date()); taskGroupQueueDao.updateById(taskGroupQueue); } catch (UnsupportedOperationException unsupportedOperationException) { - releaseTaskGroupQueueSlot(taskGroupQueue); + deleteTaskGroupQueueSlot(taskGroupQueue); log.info( "Notify the Waiting TaskInstance: {} for taskGroupQueue: {} failed, will release the taskGroupQueue", taskGroupQueue.getTaskName(), taskGroupQueue.getId(), unsupportedOperationException); @@ -417,7 +414,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread { /** * Release the task group slot for the given {@link TaskInstance}. *

- * When taskInstance want to release a TaskGroup slot, should call this method. The release method will move the TaskGroupQueue out queue and set status to {@link TaskGroupQueueStatus#RELEASE}. + * When taskInstance want to release a TaskGroup slot, should call this method. The release method will delete the taskGroupQueue. * This method is idempotent, this means that if the task group slot is already released, this method will do nothing. * * @param taskInstance the task instance which want to release task group slot. @@ -429,7 +426,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread { } List taskGroupQueues = taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId()); for (TaskGroupQueue taskGroupQueue : taskGroupQueues) { - releaseTaskGroupQueueSlot(taskGroupQueue); + deleteTaskGroupQueueSlot(taskGroupQueue); } } @@ -479,16 +476,8 @@ public class TaskGroupCoordinator extends BaseDaemonThread { log.info("Wake up TaskInstance: {} success", taskInstance.getName()); } - private void releaseTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) { - if (TaskGroupQueueStatus.RELEASE.equals(taskGroupQueue.getStatus()) - && Flag.NO.getCode() == taskGroupQueue.getInQueue()) { - log.info("The TaskGroupQueue: {} is already released", taskGroupQueue); - return; - } - taskGroupQueue.setInQueue(Flag.NO.getCode()); - taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE); - taskGroupQueue.setUpdateTime(new Date()); - taskGroupQueueDao.updateById(taskGroupQueue); + private void deleteTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) { + taskGroupQueueDao.deleteById(taskGroupQueue); log.info("Success release TaskGroupQueue: {}", taskGroupQueue); } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java index f654550b75..9214afaba8 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java @@ -25,7 +25,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.dao.entity.TaskGroup; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -174,9 +173,7 @@ class TaskGroupCoordinatorTest { when(taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId())).thenReturn(taskGroupQueues); taskGroupCoordinator.releaseTaskGroupSlot(taskInstance); - assertEquals(Flag.NO.getCode(), taskGroupQueue.getInQueue()); - assertEquals(TaskGroupQueueStatus.RELEASE, taskGroupQueue.getStatus()); - verify(taskGroupQueueDao, Mockito.times(1)).updateById(taskGroupQueue); + verify(taskGroupQueueDao, Mockito.times(1)).deleteById(taskGroupQueue); } }