From d6835533243e692f4d23b7e6923b2bf4d748ee52 Mon Sep 17 00:00:00 2001 From: dbac <642826683@qq.com> Date: Wed, 11 Oct 2023 15:32:07 +0800 Subject: [PATCH] [Improvement][Master] Calculate the remainTime then we set the delay execution. (#15012) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [#15007][fix][worker-server] fix bug task delay execution * [#15007][fix][worker-server] fix bug task delay execution mvn spotless:apply * [#15007][fix][worker-server] fix bug task delay execution worker fix task delay logic --------- Co-authored-by: 旺阳 --- ...TaskInstanceDispatchOperationFunction.java | 28 +++++++++++-------- ...GlobalTaskInstanceDispatchQueueLooper.java | 25 ++++++++++------- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java index d97cdea0aa..c60e860aa7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java @@ -62,19 +62,23 @@ public class LogicITaskInstanceDispatchOperationFunction LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext); - // todo: calculate the delay in master dispatcher then we don't need to use a queue to store the task - final long remainTime = - DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), - TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime())); - if (remainTime > 0) { - log.info("Current taskInstance: {} is choosing delay execution, delay time: {}/ms, remainTime: {}/ms", - taskExecutionContext.getTaskName(), - TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime); - taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION); - // todo: send delay execution message - return LogicTaskDispatchResponse.success(taskExecutionContext.getTaskInstanceId()); - } + int delayTime = taskExecutionContext.getDelayTime(); + if (delayTime > 0) { + // todo: calculate the delay in master dispatcher then we don't need to use a queue to store the task + final long remainTime = + DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), + TimeUnit.SECONDS.toMillis(delayTime)); + if (remainTime > 0) { + log.info( + "Current taskInstance: {} is choosing delay execution, delay time: {}/ms, remainTime: {}/ms", + taskExecutionContext.getTaskName(), + TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime); + taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION); + // todo: send delay execution message + return LogicTaskDispatchResponse.success(taskExecutionContext.getTaskInstanceId()); + } + } final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable = masterTaskExecuteRunnableFactoryBuilder .createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext.getTaskType()) diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java index eb0e33d389..654a1dd9fe 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java @@ -75,16 +75,21 @@ public class GlobalTaskInstanceDispatchQueueLooper extends BaseDaemonThread { TaskExecutionContext taskExecutionContext = globalTaskInstanceDispatchQueue.take(); LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId()); - // delay task process - long remainTime = - DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), - taskExecutionContext.getDelayTime() * 60L); - if (remainTime > 0) { - log.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime); - taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION); - // todo: use delay running event - workerMessageSender.sendMessage(taskExecutionContext, - ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH); + + int delayTime = taskExecutionContext.getDelayTime(); + if (delayTime > 0) { + // delay task process + long remainTime = + DateUtils.getRemainTime( + DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), + delayTime * 60L); + if (remainTime > 0) { + log.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime); + taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION); + // todo: use delay running event + workerMessageSender.sendMessage(taskExecutionContext, + ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH); + } } WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder .createWorkerDelayTaskExecuteRunnableFactory(