Browse Source

[Improvement][Master] Calculate the remainTime then we set the delay execution. (#15012)

* [#15007][fix][worker-server] fix bug task delay execution

* [#15007][fix][worker-server] fix bug task delay execution
    mvn spotless:apply

* [#15007][fix][worker-server] fix bug task delay execution
    worker fix  task delay logic

---------

Co-authored-by: 旺阳 <wang@lqwang.net>
3.2.1-prepare
dbac 1 year ago committed by GitHub
parent
commit
d683553324
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
  2. 9
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java

@ -62,19 +62,23 @@ public class LogicITaskInstanceDispatchOperationFunction
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext); MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext);
int delayTime = taskExecutionContext.getDelayTime();
if (delayTime > 0) {
// todo: calculate the delay in master dispatcher then we don't need to use a queue to store the task // todo: calculate the delay in master dispatcher then we don't need to use a queue to store the task
final long remainTime = final long remainTime =
DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime())); TimeUnit.SECONDS.toMillis(delayTime));
if (remainTime > 0) { if (remainTime > 0) {
log.info("Current taskInstance: {} is choosing delay execution, delay time: {}/ms, remainTime: {}/ms", log.info(
"Current taskInstance: {} is choosing delay execution, delay time: {}/ms, remainTime: {}/ms",
taskExecutionContext.getTaskName(), taskExecutionContext.getTaskName(),
TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime); TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION); taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
// todo: send delay execution message // todo: send delay execution message
return LogicTaskDispatchResponse.success(taskExecutionContext.getTaskInstanceId()); return LogicTaskDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
} }
}
final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable = final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable =
masterTaskExecuteRunnableFactoryBuilder masterTaskExecuteRunnableFactoryBuilder
.createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext.getTaskType()) .createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext.getTaskType())

9
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java

@ -75,10 +75,14 @@ public class GlobalTaskInstanceDispatchQueueLooper extends BaseDaemonThread {
TaskExecutionContext taskExecutionContext = globalTaskInstanceDispatchQueue.take(); TaskExecutionContext taskExecutionContext = globalTaskInstanceDispatchQueue.take();
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId()); LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId());
int delayTime = taskExecutionContext.getDelayTime();
if (delayTime > 0) {
// delay task process // delay task process
long remainTime = long remainTime =
DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), DateUtils.getRemainTime(
taskExecutionContext.getDelayTime() * 60L); DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
delayTime * 60L);
if (remainTime > 0) { if (remainTime > 0) {
log.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime); log.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION); taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
@ -86,6 +90,7 @@ public class GlobalTaskInstanceDispatchQueueLooper extends BaseDaemonThread {
workerMessageSender.sendMessage(taskExecutionContext, workerMessageSender.sendMessage(taskExecutionContext,
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH); ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
} }
}
WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder
.createWorkerDelayTaskExecuteRunnableFactory( .createWorkerDelayTaskExecuteRunnableFactory(
taskExecutionContext, taskExecutionContext,

Loading…
Cancel
Save