Browse Source

[DSIP-44] Set a delay time to TaskExecuteRunnable if it dispatched failed (#16069)

upstream-dev
Wenjun Ruan 6 months ago committed by GitHub
parent
commit
e6c57430e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java
  2. 28
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
  3. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
  4. 35
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
  5. 53
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
  6. 85
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java
  7. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
  8. 14
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
  9. 82
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java
  10. 41
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
  11. 184
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
  12. 67
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java
  13. 35
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java
  14. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java

1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java

@ -66,7 +66,6 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
taskExecutionContext.setEnvironmentConfig(taskInstance.getEnvironmentConfig());
taskExecutionContext.setHost(taskInstance.getHost());
taskExecutionContext.setDelayTime(taskInstance.getDelayTime());
taskExecutionContext.setVarPool(taskInstance.getVarPool());
taskExecutionContext.setDryRun(taskInstance.getDryRun());
taskExecutionContext.setTestFlag(taskInstance.getTestFlag());

28
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java

@ -52,4 +52,32 @@ public abstract class BaseTaskExecuteRunnable implements TaskExecuteRunnable {
return taskExecutionContext;
}
@Override
public int compareTo(TaskExecuteRunnable other) {
if (other == null) {
return 1;
}
int workflowInstancePriorityCompareResult = workflowInstance.getProcessInstancePriority().getCode() -
other.getWorkflowInstance().getProcessInstancePriority().getCode();
if (workflowInstancePriorityCompareResult != 0) {
return workflowInstancePriorityCompareResult;
}
// smaller number, higher priority
int taskInstancePriorityCompareResult = taskInstance.getTaskInstancePriority().getCode()
- other.getTaskInstance().getTaskInstancePriority().getCode();
if (taskInstancePriorityCompareResult != 0) {
return taskInstancePriorityCompareResult;
}
// larger number, higher priority
int taskGroupPriorityCompareResult =
taskInstance.getTaskGroupPriority() - other.getTaskInstance().getTaskGroupPriority();
if (taskGroupPriorityCompareResult != 0) {
return -taskGroupPriorityCompareResult;
}
// earlier submit time, higher priority
return taskInstance.getFirstSubmitTime().compareTo(other.getTaskInstance().getFirstSubmitTime());
}
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java

@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
public class DefaultTaskExecuteRunnable extends PriorityDelayTaskExecuteRunnable {
public class DefaultTaskExecuteRunnable extends BaseTaskExecuteRunnable {
private final TaskExecuteRunnableOperatorManager taskExecuteRunnableOperatorManager;

35
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java

@ -17,7 +17,8 @@
package org.apache.dolphinscheduler.server.master.runner;
import java.util.concurrent.DelayQueue;
import org.apache.dolphinscheduler.server.master.runner.queue.DelayEntry;
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@ -25,26 +26,42 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* The class is used to store {@link TaskExecuteRunnable} which needs to be dispatched. The {@link TaskExecuteRunnable} will be stored in a {@link DelayQueue},
* if the {@link TaskExecuteRunnable}'s delay time is 0, then it will be consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
* The class is used to store {@link TaskExecuteRunnable} which needs to be dispatched. The {@link TaskExecuteRunnable}
* will be stored in {@link PriorityDelayQueue}, if the {@link TaskExecuteRunnable}'s delay time is 0, then it will be
* consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
* <p>
* The order of {@link TaskExecuteRunnable} in the {@link PriorityDelayQueue} is determined by {@link TaskExecuteRunnable#compareTo}.
*/
@Slf4j
@Component
public class GlobalTaskDispatchWaitingQueue {
private final DelayQueue<DefaultTaskExecuteRunnable> queue = new DelayQueue<>();
private final PriorityDelayQueue<DelayEntry<TaskExecuteRunnable>> priorityDelayQueue = new PriorityDelayQueue<>();
public void submitTaskExecuteRunnable(DefaultTaskExecuteRunnable priorityTaskExecuteRunnable) {
queue.put(priorityTaskExecuteRunnable);
/**
* Submit a {@link TaskExecuteRunnable} with delay time 0, it will be consumed immediately.
*/
public void dispatchTaskExecuteRunnable(TaskExecuteRunnable taskExecuteRunnable) {
dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable, 0);
}
/**
* Submit a {@link TaskExecuteRunnable} with delay time, if the delay time <= 0 then it can be consumed.
*/
public void dispatchTaskExecuteRunnableWithDelay(TaskExecuteRunnable taskExecuteRunnable, long delayTimeMills) {
priorityDelayQueue.add(new DelayEntry<>(delayTimeMills, taskExecuteRunnable));
}
/**
* Consume {@link TaskExecuteRunnable} from the {@link PriorityDelayQueue}, only the delay time <= 0 can be consumed.
*/
@SneakyThrows
public DefaultTaskExecuteRunnable takeTaskExecuteRunnable() {
return queue.take();
public TaskExecuteRunnable takeTaskExecuteRunnable() {
return priorityDelayQueue.take().getData();
}
public int getWaitingDispatchTaskNumber() {
return queue.size();
return priorityDelayQueue.size();
}
}

53
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java

@ -18,13 +18,11 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatchFactory;
import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
@ -43,10 +41,6 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple
private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
private final AtomicInteger DISPATCHED_CONSECUTIVE_FAILURE_TIMES = new AtomicInteger();
private static final Integer MAX_DISPATCHED_FAILED_TIMES = 100;
public GlobalTaskDispatchWaitingQueueLooper() {
super("GlobalTaskDispatchWaitingQueueLooper");
}
@ -64,29 +58,34 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple
@Override
public void run() {
DefaultTaskExecuteRunnable defaultTaskExecuteRunnable;
while (RUNNING_FLAG.get()) {
defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
try {
TaskExecutionStatus status = defaultTaskExecuteRunnable.getTaskInstance().getState();
if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status != TaskExecutionStatus.DELAY_EXECUTION) {
log.warn("The TaskInstance {} state is : {}, will not dispatch",
defaultTaskExecuteRunnable.getTaskInstance().getName(), status);
continue;
}
doDispatch();
}
}
TaskDispatcher taskDispatcher =
taskDispatchFactory.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance());
taskDispatcher.dispatchTask(defaultTaskExecuteRunnable);
DISPATCHED_CONSECUTIVE_FAILURE_TIMES.set(0);
} catch (Exception e) {
defaultTaskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes();
globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(defaultTaskExecuteRunnable);
if (DISPATCHED_CONSECUTIVE_FAILURE_TIMES.incrementAndGet() > MAX_DISPATCHED_FAILED_TIMES) {
ThreadUtils.sleep(10 * 1000L);
}
log.error("Dispatch Task: {} failed", defaultTaskExecuteRunnable.getTaskInstance().getName(), e);
void doDispatch() {
final TaskExecuteRunnable taskExecuteRunnable = globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance();
if (taskInstance == null) {
// This case shouldn't happen, but if it does, log an error and continue
log.error("The TaskInstance is null, drop it(This case shouldn't happen)");
return;
}
try {
TaskExecutionStatus status = taskInstance.getState();
if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status != TaskExecutionStatus.DELAY_EXECUTION) {
log.warn("The TaskInstance {} state is : {}, will not dispatch", taskInstance.getName(), status);
return;
}
taskDispatchFactory.getTaskDispatcher(taskInstance).dispatchTask(taskExecuteRunnable);
} catch (Exception e) {
// If dispatch failed, will put the task back to the queue
// The task will be dispatched after waiting time.
// the waiting time will increase multiple of times, but will not exceed 60 seconds
long waitingTimeMills = Math.max(
taskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L);
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable, waitingTimeMills);
log.error("Dispatch Task: {} failed will retry after: {}/ms", taskInstance.getName(), waitingTimeMills, e);
}
}

85
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java

@ -1,85 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public abstract class PriorityDelayTaskExecuteRunnable extends BaseTaskExecuteRunnable implements Delayed {
public PriorityDelayTaskExecuteRunnable(ProcessInstance workflowInstance,
TaskInstance taskInstance,
TaskExecutionContext taskExecutionContext) {
super(workflowInstance, taskInstance, taskExecutionContext);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(
DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L),
TimeUnit.SECONDS);
}
@Override
public int compareTo(Delayed o) {
if (o == null) {
return 1;
}
int delayTimeCompareResult =
Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
if (delayTimeCompareResult != 0) {
return delayTimeCompareResult;
}
PriorityDelayTaskExecuteRunnable other = (PriorityDelayTaskExecuteRunnable) o;
// the smaller dispatch fail times, the higher priority
int dispatchFailTimesCompareResult = taskExecutionContext.getDispatchFailTimes()
- other.getTaskExecutionContext().getDispatchFailTimes();
if (dispatchFailTimesCompareResult != 0) {
return dispatchFailTimesCompareResult;
}
int workflowInstancePriorityCompareResult = workflowInstance.getProcessInstancePriority().getCode()
- other.getWorkflowInstance().getProcessInstancePriority().getCode();
if (workflowInstancePriorityCompareResult != 0) {
return workflowInstancePriorityCompareResult;
}
long workflowInstanceIdCompareResult = workflowInstance.getId().compareTo(other.getWorkflowInstance().getId());
if (workflowInstanceIdCompareResult != 0) {
return workflowInstancePriorityCompareResult;
}
int taskInstancePriorityCompareResult = taskInstance.getTaskInstancePriority().getCode()
- other.getTaskInstance().getTaskInstancePriority().getCode();
if (taskInstancePriorityCompareResult != 0) {
return taskInstancePriorityCompareResult;
}
// larger number, higher priority
int taskGroupPriorityCompareResult =
taskInstance.getTaskGroupPriority() - other.getTaskInstance().getTaskGroupPriority();
if (taskGroupPriorityCompareResult != 0) {
return -taskGroupPriorityCompareResult;
}
// The task instance shouldn't be equals
return taskInstance.getId().compareTo(other.getTaskInstance().getId());
}
}

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java

@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
* This interface is used to define a task which is executing.
* todo: split to MasterTaskExecuteRunnable and WorkerTaskExecuteRunnable
*/
public interface TaskExecuteRunnable {
public interface TaskExecuteRunnable extends Comparable<TaskExecuteRunnable> {
void dispatch();
@ -40,4 +40,5 @@ public interface TaskExecuteRunnable {
TaskInstance getTaskInstance();
TaskExecutionContext getTaskExecutionContext();
}

14
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java

@ -17,14 +17,13 @@
package org.apache.dolphinscheduler.server.master.runner.operator;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ -43,16 +42,17 @@ public abstract class BaseTaskExecuteRunnableDispatchOperator implements TaskExe
@Override
public void operate(DefaultTaskExecuteRunnable taskExecuteRunnable) {
long remainTime = taskExecuteRunnable.getDelay(TimeUnit.SECONDS);
TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance();
if (remainTime > 0) {
long remainTimeMills =
DateUtils.getRemainTime(taskInstance.getFirstSubmitTime(), taskInstance.getDelayTime() * 60L) * 1_000;
if (remainTimeMills > 0) {
taskInstance.setState(TaskExecutionStatus.DELAY_EXECUTION);
taskInstanceDao.updateById(taskInstance);
log.info("Current taskInstance: {} is choose delay execution, delay time: {}/min, remainTime: {}/s",
log.info("Current taskInstance: {} is choose delay execution, delay time: {}/min, remainTime: {}/ms",
taskInstance.getName(),
taskInstance.getDelayTime(),
remainTime);
remainTimeMills);
}
globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(taskExecuteRunnable);
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable, remainTimeMills);
}
}

82
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.queue;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Objects;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import org.jetbrains.annotations.NotNull;
public class DelayEntry<V extends Comparable<V>> implements Delayed {
private final long delayTimeMills;
private final long triggerTimeMills;
@Getter
private final V data;
public DelayEntry(long delayTimeMills, V data) {
this.delayTimeMills = delayTimeMills;
this.triggerTimeMills = System.currentTimeMillis() + delayTimeMills;
this.data = checkNotNull(data, "data is null");
}
@Override
public long getDelay(@NotNull TimeUnit unit) {
long remainTimeMills = triggerTimeMills - System.currentTimeMillis();
if (TimeUnit.MILLISECONDS.equals(unit)) {
return remainTimeMills;
}
return unit.convert(remainTimeMills, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(@NotNull Delayed o) {
DelayEntry<V> other = (DelayEntry<V>) o;
int delayTimeMillsCompareResult = Long.compare(delayTimeMills, other.delayTimeMills);
if (delayTimeMillsCompareResult != 0) {
return delayTimeMillsCompareResult;
}
if (data == null || other.data == null) {
return 0;
}
return data.compareTo(other.data);
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
DelayEntry<?> that = (DelayEntry<?>) o;
return delayTimeMills == that.delayTimeMills && Objects.equals(data, that.data);
}
@Override
public int hashCode() {
return Objects.hash(delayTimeMills, data);
}
}

41
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.queue;
import java.util.concurrent.DelayQueue;
import lombok.SneakyThrows;
public class PriorityDelayQueue<V extends DelayEntry> {
private final DelayQueue<V> queue = new DelayQueue<>();
public void add(V v) {
queue.put(v);
}
@SneakyThrows
public V take() {
return queue.take();
}
public int size() {
return queue.size();
}
}

184
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import static com.google.common.truth.Truth.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
import org.apache.commons.lang3.time.DateUtils;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class GlobalTaskDispatchWaitingQueueTest {
private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
@BeforeEach
public void setUp() {
globalTaskDispatchWaitingQueue = new GlobalTaskDispatchWaitingQueue();
}
@Test
void submitTaskExecuteRunnable() {
TaskExecuteRunnable taskExecuteRunnable = createTaskExecuteRunnable();
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable);
Awaitility.await()
.atMost(Duration.ofSeconds(1))
.untilAsserted(
() -> Assertions.assertNotNull(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()));
}
@Test
void testSubmitTaskExecuteRunnableWithDelay() {
TaskExecuteRunnable taskExecuteRunnable = createTaskExecuteRunnable();
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable, 3_000L);
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable);
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).isNotNull();
Awaitility.await()
.atLeast(Duration.ofSeconds(2))
.atMost(Duration.ofSeconds(4))
.untilAsserted(
() -> Assertions.assertNotNull(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()));
}
@Test
void takeTaskExecuteRunnable_NoElementShouldBlock() {
CompletableFuture<Void> completableFuture =
CompletableFuture.runAsync(() -> globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable());
assertThrowsExactly(ConditionTimeoutException.class,
() -> await()
.atLeast(Duration.ofSeconds(2))
.timeout(Duration.ofSeconds(3))
.until(completableFuture::isDone));
}
@Test
void takeTaskExecuteRunnable_withDifferentTaskInstancePriority() {
TaskExecuteRunnable taskExecuteRunnable1 = createTaskExecuteRunnable();
taskExecuteRunnable1.getTaskInstance().setId(1);
taskExecuteRunnable1.getTaskInstance().setTaskInstancePriority(Priority.MEDIUM);
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable1);
TaskExecuteRunnable taskExecuteRunnable2 = createTaskExecuteRunnable();
taskExecuteRunnable2.getTaskInstance().setId(2);
taskExecuteRunnable2.getTaskInstance().setTaskInstancePriority(Priority.HIGH);
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable2);
TaskExecuteRunnable taskExecuteRunnable3 = createTaskExecuteRunnable();
taskExecuteRunnable3.getTaskInstance().setId(3);
taskExecuteRunnable3.getTaskInstance().setTaskInstancePriority(Priority.LOW);
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable3);
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
.isEqualTo(2);
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
.isEqualTo(1);
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
.isEqualTo(3);
}
@Test
void takeTaskExecuteRunnable_withDifferentTaskGroupPriority() {
TaskExecuteRunnable taskExecuteRunnable1 = createTaskExecuteRunnable();
taskExecuteRunnable1.getTaskInstance().setId(1);
taskExecuteRunnable1.getTaskInstance().setTaskGroupPriority(Priority.MEDIUM.getCode());
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable1);
TaskExecuteRunnable taskExecuteRunnable2 = createTaskExecuteRunnable();
taskExecuteRunnable2.getTaskInstance().setId(2);
taskExecuteRunnable2.getTaskInstance().setTaskGroupPriority(Priority.HIGH.getCode());
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable2);
TaskExecuteRunnable taskExecuteRunnable3 = createTaskExecuteRunnable();
taskExecuteRunnable3.getTaskInstance().setId(3);
taskExecuteRunnable3.getTaskInstance().setTaskGroupPriority(Priority.LOW.getCode());
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable3);
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
.isEqualTo(3);
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
.isEqualTo(1);
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
.isEqualTo(2);
}
@Test
void takeTaskExecuteRunnable_withDifferentSubmitTime() {
Date now = new Date();
TaskExecuteRunnable taskExecuteRunnable1 = createTaskExecuteRunnable();
taskExecuteRunnable1.getTaskInstance().setId(1);
taskExecuteRunnable1.getTaskInstance().setFirstSubmitTime(now);
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable1);
TaskExecuteRunnable taskExecuteRunnable2 = createTaskExecuteRunnable();
taskExecuteRunnable2.getTaskInstance().setId(2);
taskExecuteRunnable2.getTaskInstance().setFirstSubmitTime(DateUtils.addMinutes(now, 1));
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable2);
TaskExecuteRunnable taskExecuteRunnable3 = createTaskExecuteRunnable();
taskExecuteRunnable3.getTaskInstance().setId(3);
taskExecuteRunnable3.getTaskInstance().setFirstSubmitTime(DateUtils.addMinutes(now, -1));
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable3);
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
.isEqualTo(3);
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
.isEqualTo(1);
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
.isEqualTo(2);
}
@Test
void getWaitingDispatchTaskNumber() {
Assertions.assertEquals(0, globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
TaskExecuteRunnable taskExecuteRunnable = createTaskExecuteRunnable();
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable);
Assertions.assertEquals(1, globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
}
private TaskExecuteRunnable createTaskExecuteRunnable() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setProcessInstancePriority(Priority.MEDIUM);
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskInstancePriority(Priority.MEDIUM);
taskInstance.setFirstSubmitTime(new Date());
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
return new DefaultTaskExecuteRunnable(processInstance, taskInstance, taskExecutionContext,
new TaskExecuteRunnableOperatorManager());
}
}

67
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java

@ -1,67 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.execute;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.PriorityDelayTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class PriorityDelayTaskExecuteRunnableTest {
@Test
public void testCompareTo() {
TaskExecuteRunnableOperatorManager taskOperatorManager = new TaskExecuteRunnableOperatorManager();
ProcessInstance workflowInstance = new ProcessInstance();
workflowInstance.setId(1);
workflowInstance.setProcessInstancePriority(Priority.HIGH);
TaskInstance t1 = new TaskInstance();
t1.setId(1);
t1.setTaskInstancePriority(Priority.HIGH);
TaskInstance t2 = new TaskInstance();
t2.setId(1);
t2.setTaskInstancePriority(Priority.HIGH);
TaskExecutionContext context1 = new TaskExecutionContext();
TaskExecutionContext context2 = new TaskExecutionContext();
PriorityDelayTaskExecuteRunnable p1 =
new DefaultTaskExecuteRunnable(workflowInstance, t1, context1, taskOperatorManager);
PriorityDelayTaskExecuteRunnable p2 =
new DefaultTaskExecuteRunnable(workflowInstance, t2, context2, taskOperatorManager);
Assertions.assertEquals(0, p1.compareTo(p2));
// the higher priority, the higher priority
t2.setTaskInstancePriority(Priority.MEDIUM);
Assertions.assertTrue(p1.compareTo(p2) < 0);
// the smaller dispatch fail times, the higher priority
context1.setDispatchFailTimes(1);
Assertions.assertTrue(p1.compareTo(p2) > 0);
}
}

35
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.queue;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import com.google.common.truth.Truth;
class DelayEntryTest {
@Test
void getDelay() {
DelayEntry<String> delayEntry = new DelayEntry<>(1_000L, "Item");
Truth.assertThat(delayEntry.getDelay(TimeUnit.NANOSECONDS))
.isWithin(100)
.of(TimeUnit.NANOSECONDS.convert(1_000L, TimeUnit.MILLISECONDS));
}
}

12
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java

@ -203,11 +203,6 @@ public class TaskExecutionContext implements Serializable {
*/
private String workerGroup;
/**
* delay execution time.
*/
private int delayTime;
/**
* current execution status
*/
@ -262,12 +257,9 @@ public class TaskExecutionContext implements Serializable {
private boolean logBufferEnable;
/**
* dispatch fail times
*/
private int dispatchFailTimes;
public void increaseDispatchFailTimes() {
this.dispatchFailTimes++;
public int increaseDispatchFailTimes() {
return ++dispatchFailTimes;
}
}

Loading…
Cancel
Save