Browse Source

[Improvement] Delete taskGroupQueue when release (#16425)

dev
Wenjun Ruan 4 months ago committed by GitHub
parent
commit
34b74525d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 33
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
  2. 5
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java

33
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java

@ -166,7 +166,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
} }
/** /**
* Make sure the TaskGroupQueue status is {@link TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not exist or status is finished. * Clear the TaskGroupQueue when the related {@link TaskInstance} is not exist or status is finished.
*/ */
private void amendTaskGroupQueueStatus() { private void amendTaskGroupQueueStatus() {
int minTaskGroupQueueId = -1; int minTaskGroupQueueId = -1;
@ -188,7 +188,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
} }
/** /**
* Make sure the TaskGroupQueue status is {@link TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not exist or status is finished. * Clear the TaskGroupQueue when the related {@link TaskInstance} is not exist or status is finished.
*/ */
private void amendTaskGroupQueueStatus(List<TaskGroupQueue> taskGroupQueues) { private void amendTaskGroupQueueStatus(List<TaskGroupQueue> taskGroupQueues) {
List<Integer> taskInstanceIds = taskGroupQueues.stream() List<Integer> taskInstanceIds = taskGroupQueues.stream()
@ -205,14 +205,14 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
if (taskInstance == null) { if (taskInstance == null) {
log.warn("The TaskInstance: {} is not exist, will release the TaskGroupQueue: {}", taskId, log.warn("The TaskInstance: {} is not exist, will release the TaskGroupQueue: {}", taskId,
taskGroupQueue); taskGroupQueue);
releaseTaskGroupQueueSlot(taskGroupQueue); deleteTaskGroupQueueSlot(taskGroupQueue);
continue; continue;
} }
if (taskInstance.getState().isFinished()) { if (taskInstance.getState().isFinished()) {
log.warn("The TaskInstance: {} state: {} finished, will release the TaskGroupQueue: {}", log.warn("The TaskInstance: {} state: {} finished, will release the TaskGroupQueue: {}",
taskInstance.getName(), taskInstance.getState(), taskGroupQueue); taskInstance.getName(), taskInstance.getState(), taskGroupQueue);
releaseTaskGroupQueueSlot(taskGroupQueue); deleteTaskGroupQueueSlot(taskGroupQueue);
continue; continue;
} }
} }
@ -257,13 +257,10 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
taskGroupQueue.getTaskName(), taskGroupQueue.getTaskName(),
taskGroupQueue.getId()); taskGroupQueue.getId());
taskGroupQueue.setInQueue(Flag.NO.getCode()); deleteTaskGroupQueueSlot(taskGroupQueue);
taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE);
taskGroupQueue.setUpdateTime(new Date());
taskGroupQueueDao.updateById(taskGroupQueue);
log.info("Release the force start TaskGroupQueue {}", taskGroupQueue); log.info("Release the force start TaskGroupQueue {}", taskGroupQueue);
} catch (UnsupportedOperationException unsupportedOperationException) { } catch (UnsupportedOperationException unsupportedOperationException) {
releaseTaskGroupQueueSlot(taskGroupQueue); deleteTaskGroupQueueSlot(taskGroupQueue);
log.info( log.info(
"Notify the ForceStart TaskInstance: {} for taskGroupQueue: {} failed, will release the taskGroupQueue", "Notify the ForceStart TaskInstance: {} for taskGroupQueue: {} failed, will release the taskGroupQueue",
taskGroupQueue.getTaskName(), taskGroupQueue.getId(), unsupportedOperationException); taskGroupQueue.getTaskName(), taskGroupQueue.getId(), unsupportedOperationException);
@ -322,7 +319,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
taskGroupQueue.setUpdateTime(new Date()); taskGroupQueue.setUpdateTime(new Date());
taskGroupQueueDao.updateById(taskGroupQueue); taskGroupQueueDao.updateById(taskGroupQueue);
} catch (UnsupportedOperationException unsupportedOperationException) { } catch (UnsupportedOperationException unsupportedOperationException) {
releaseTaskGroupQueueSlot(taskGroupQueue); deleteTaskGroupQueueSlot(taskGroupQueue);
log.info( log.info(
"Notify the Waiting TaskInstance: {} for taskGroupQueue: {} failed, will release the taskGroupQueue", "Notify the Waiting TaskInstance: {} for taskGroupQueue: {} failed, will release the taskGroupQueue",
taskGroupQueue.getTaskName(), taskGroupQueue.getId(), unsupportedOperationException); taskGroupQueue.getTaskName(), taskGroupQueue.getId(), unsupportedOperationException);
@ -417,7 +414,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
/** /**
* Release the task group slot for the given {@link TaskInstance}. * Release the task group slot for the given {@link TaskInstance}.
* <p> * <p>
* When taskInstance want to release a TaskGroup slot, should call this method. The release method will move the TaskGroupQueue out queue and set status to {@link TaskGroupQueueStatus#RELEASE}. * When taskInstance want to release a TaskGroup slot, should call this method. The release method will delete the taskGroupQueue.
* This method is idempotent, this means that if the task group slot is already released, this method will do nothing. * This method is idempotent, this means that if the task group slot is already released, this method will do nothing.
* *
* @param taskInstance the task instance which want to release task group slot. * @param taskInstance the task instance which want to release task group slot.
@ -429,7 +426,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
} }
List<TaskGroupQueue> taskGroupQueues = taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId()); List<TaskGroupQueue> taskGroupQueues = taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId());
for (TaskGroupQueue taskGroupQueue : taskGroupQueues) { for (TaskGroupQueue taskGroupQueue : taskGroupQueues) {
releaseTaskGroupQueueSlot(taskGroupQueue); deleteTaskGroupQueueSlot(taskGroupQueue);
} }
} }
@ -479,16 +476,8 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
log.info("Wake up TaskInstance: {} success", taskInstance.getName()); log.info("Wake up TaskInstance: {} success", taskInstance.getName());
} }
private void releaseTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) { private void deleteTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) {
if (TaskGroupQueueStatus.RELEASE.equals(taskGroupQueue.getStatus()) taskGroupQueueDao.deleteById(taskGroupQueue);
&& Flag.NO.getCode() == taskGroupQueue.getInQueue()) {
log.info("The TaskGroupQueue: {} is already released", taskGroupQueue);
return;
}
taskGroupQueue.setInQueue(Flag.NO.getCode());
taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE);
taskGroupQueue.setUpdateTime(new Date());
taskGroupQueueDao.updateById(taskGroupQueue);
log.info("Success release TaskGroupQueue: {}", taskGroupQueue); log.info("Success release TaskGroupQueue: {}", taskGroupQueue);
} }

5
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java

@ -25,7 +25,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.dao.entity.TaskGroup; import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -174,9 +173,7 @@ class TaskGroupCoordinatorTest {
when(taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId())).thenReturn(taskGroupQueues); when(taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId())).thenReturn(taskGroupQueues);
taskGroupCoordinator.releaseTaskGroupSlot(taskInstance); taskGroupCoordinator.releaseTaskGroupSlot(taskInstance);
assertEquals(Flag.NO.getCode(), taskGroupQueue.getInQueue()); verify(taskGroupQueueDao, Mockito.times(1)).deleteById(taskGroupQueue);
assertEquals(TaskGroupQueueStatus.RELEASE, taskGroupQueue.getStatus());
verify(taskGroupQueueDao, Mockito.times(1)).updateById(taskGroupQueue);
} }
} }

Loading…
Cancel
Save