From 04a6b0d2814c69b056639f65546ffd62ead22321 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 4 Mar 2024 17:30:43 +0800 Subject: [PATCH] Fix task mighe be dispatched even if it has been killed (#15662) --- .../GlobalTaskDispatchWaitingQueue.java | 4 +- .../GlobalTaskDispatchWaitingQueueLooper.java | 19 +-- ...balTaskDispatchWaitingQueueLooperTest.java | 110 ++++++++++++++++++ pom.xml | 7 ++ 4 files changed, 131 insertions(+), 9 deletions(-) create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java index 7e0d683571..f03bd6b903 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner; import java.util.concurrent.DelayQueue; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -37,7 +38,8 @@ public class GlobalTaskDispatchWaitingQueue { queue.put(priorityTaskExecuteRunnable); } - public DefaultTaskExecuteRunnable takeTaskExecuteRunnable() throws InterruptedException { + @SneakyThrows + public DefaultTaskExecuteRunnable takeTaskExecuteRunnable() { return queue.take(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java index a1f4b28783..49234a99d3 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatchFactory; import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher; @@ -65,14 +66,15 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple public void run() { DefaultTaskExecuteRunnable defaultTaskExecuteRunnable; while (RUNNING_FLAG.get()) { + defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable(); try { - defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable(); - } catch (InterruptedException e) { - log.warn("Get waiting dispatch task failed, the current thread has been interrupted, will stop loop"); - Thread.currentThread().interrupt(); - break; - } - try { + TaskExecutionStatus status = defaultTaskExecuteRunnable.getTaskInstance().getState(); + if (status != TaskExecutionStatus.SUBMITTED_SUCCESS) { + log.warn("The TaskInstance {} state is : {}, will not dispatch", + defaultTaskExecuteRunnable.getTaskInstance().getName(), status); + continue; + } + TaskDispatcher taskDispatcher = taskDispatchFactory.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance()); taskDispatcher.dispatchTask(defaultTaskExecuteRunnable); @@ -86,7 +88,6 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple log.error("Dispatch Task: {} failed", defaultTaskExecuteRunnable.getTaskInstance().getName(), e); } } - log.info("GlobalTaskDispatchWaitingQueueLooper started..."); } @Override @@ -94,6 +95,8 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple if (RUNNING_FLAG.compareAndSet(true, false)) { log.info("GlobalTaskDispatchWaitingQueueLooper stopping..."); log.info("GlobalTaskDispatchWaitingQueueLooper stopped..."); + } else { + log.error("GlobalTaskDispatchWaitingQueueLooper is not started"); } } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java new file mode 100644 index 0000000000..ea45ab17b3 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import static java.time.Duration.ofSeconds; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatchFactory; +import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher; +import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager; + +import java.util.HashMap; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class GlobalTaskDispatchWaitingQueueLooperTest { + + @InjectMocks + private GlobalTaskDispatchWaitingQueueLooper globalTaskDispatchWaitingQueueLooper; + + @Mock + private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue; + + @Mock + private TaskDispatchFactory taskDispatchFactory; + + @Test + void testTaskExecutionRunnableStatusIsNotSubmitted() throws Exception { + ProcessInstance processInstance = new ProcessInstance(); + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setState(TaskExecutionStatus.KILL); + taskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>())); + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + TaskExecuteRunnableOperatorManager taskExecuteRunnableOperatorManager = + new TaskExecuteRunnableOperatorManager(); + DefaultTaskExecuteRunnable defaultTaskExecuteRunnable = new DefaultTaskExecuteRunnable(processInstance, + taskInstance, taskExecutionContext, taskExecuteRunnableOperatorManager); + + TaskDispatcher taskDispatcher = mock(TaskDispatcher.class); + when(taskDispatchFactory.getTaskDispatcher(taskInstance)).thenReturn(taskDispatcher); + doNothing().when(taskDispatcher).dispatchTask(any()); + + when(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).thenReturn(defaultTaskExecuteRunnable); + globalTaskDispatchWaitingQueueLooper.start(); + await().during(ofSeconds(1)) + .untilAsserted(() -> verify(taskDispatchFactory, never()).getTaskDispatcher(taskInstance)); + globalTaskDispatchWaitingQueueLooper.close(); + } + + @Test + void testTaskExecutionRunnableStatusIsSubmitted() throws Exception { + ProcessInstance processInstance = new ProcessInstance(); + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS); + taskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>())); + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + TaskExecuteRunnableOperatorManager taskExecuteRunnableOperatorManager = + new TaskExecuteRunnableOperatorManager(); + DefaultTaskExecuteRunnable defaultTaskExecuteRunnable = new DefaultTaskExecuteRunnable(processInstance, + taskInstance, taskExecutionContext, taskExecuteRunnableOperatorManager); + + TaskDispatcher taskDispatcher = mock(TaskDispatcher.class); + when(taskDispatchFactory.getTaskDispatcher(taskInstance)).thenReturn(taskDispatcher); + doNothing().when(taskDispatcher).dispatchTask(any()); + + when(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).thenReturn(defaultTaskExecuteRunnable); + globalTaskDispatchWaitingQueueLooper.start(); + await().atMost(ofSeconds(1)).untilAsserted(() -> { + verify(taskDispatchFactory, atLeastOnce()).getTaskDispatcher(any(TaskInstance.class)); + verify(taskDispatcher, atLeastOnce()).dispatchTask(any(TaskExecuteRunnable.class)); + }); + globalTaskDispatchWaitingQueueLooper.close(); + + } +} diff --git a/pom.xml b/pom.xml index 260524b1e5..4a71741f8e 100755 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ 3.0.0 7.1.2 1.18.20 + 4.2.0 apache ${project.name} ${project.version} @@ -365,6 +366,12 @@ ${lombok.version} provided + + org.awaitility + awaitility + ${awaitility.version} + test +