|
|
@ -18,7 +18,6 @@ |
|
|
|
package org.apache.dolphinscheduler.server.master.runner.execute; |
|
|
|
package org.apache.dolphinscheduler.server.master.runner.execute; |
|
|
|
|
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.Delayed; |
|
|
|
import java.util.concurrent.Delayed; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
@ -38,7 +37,6 @@ public class AsyncTaskExecutionContext implements Delayed { |
|
|
|
private long currentStartTime; |
|
|
|
private long currentStartTime; |
|
|
|
private int executeTimes; |
|
|
|
private int executeTimes; |
|
|
|
private final long executeInterval; |
|
|
|
private final long executeInterval; |
|
|
|
private long timeout; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public AsyncTaskExecutionContext(@NonNull TaskExecutionContext taskExecutionContext, |
|
|
|
public AsyncTaskExecutionContext(@NonNull TaskExecutionContext taskExecutionContext, |
|
|
|
@NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction, |
|
|
|
@NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction, |
|
|
@ -48,29 +46,20 @@ public class AsyncTaskExecutionContext implements Delayed { |
|
|
|
this.asyncTaskCallbackFunction = asyncTaskCallbackFunction; |
|
|
|
this.asyncTaskCallbackFunction = asyncTaskCallbackFunction; |
|
|
|
this.currentStartTime = 0; |
|
|
|
this.currentStartTime = 0; |
|
|
|
this.executeTimes = 0; |
|
|
|
this.executeTimes = 0; |
|
|
|
if (TaskTimeoutStrategy.FAILED.equals(taskExecutionContext.getTaskTimeoutStrategy()) |
|
|
|
|
|
|
|
|| TaskTimeoutStrategy.WARNFAILED.equals(taskExecutionContext.getTaskTimeoutStrategy())) { |
|
|
|
|
|
|
|
// will timeout
|
|
|
|
|
|
|
|
this.timeout = taskExecutionContext.getStartTime() |
|
|
|
|
|
|
|
+ TimeUnit.SECONDS.toMillis(taskExecutionContext.getTaskTimeout()); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
this.timeout = TimeUnit.SECONDS.toMillis(Integer.MAX_VALUE); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
this.executeInterval = Math.max(asyncTaskExecuteFunction.getAsyncTaskStateCheckInterval().toMillis(), 1000L); |
|
|
|
this.executeInterval = Math.max(asyncTaskExecuteFunction.getAsyncTaskStateCheckInterval().toMillis(), 1000L); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public void refreshStartTime() { |
|
|
|
public synchronized void refreshStartTime() { |
|
|
|
if (executeTimes == 0) { |
|
|
|
if (executeTimes != 0) { |
|
|
|
// The first time doesn't have delay
|
|
|
|
// The first time doesn't have delay
|
|
|
|
executeTimes++; |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
currentStartTime = System.currentTimeMillis(); |
|
|
|
currentStartTime = System.currentTimeMillis(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
executeTimes++; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public long getDelay(TimeUnit unit) { |
|
|
|
public long getDelay(TimeUnit unit) { |
|
|
|
long nextExecuteTimeDelay = Math.min(currentStartTime + executeInterval, timeout) - System.currentTimeMillis(); |
|
|
|
long nextExecuteTimeDelay = currentStartTime + executeInterval - System.currentTimeMillis(); |
|
|
|
return unit.convert(nextExecuteTimeDelay, TimeUnit.MILLISECONDS); |
|
|
|
return unit.convert(nextExecuteTimeDelay, TimeUnit.MILLISECONDS); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|