Browse Source

fix timeout (#7224)

Co-authored-by: caishunfeng <534328519@qq.com>
2.0.1-release
wind 3 years ago committed by GitHub
parent
commit
30848478bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  2. 25
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -96,7 +96,7 @@ public class StateWheelExecuteThread extends Thread {
for (TaskInstance taskInstance : taskInstanceTimeoutCheckList.values()) {
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (0 >= timeRemain) {
if (timeRemain < 0) {
addTaskTimeoutEvent(taskInstance);
taskInstanceTimeoutCheckList.remove(taskInstance.getId());
}
@ -127,7 +127,7 @@ public class StateWheelExecuteThread extends Thread {
for (ProcessInstance processInstance : this.processInstanceTimeoutCheckList.values()) {
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (0 >= timeRemain) {
if (timeRemain < 0) {
addProcessTimeoutEvent(processInstance);
processInstanceTimeoutCheckList.remove(processInstance.getId());
}
@ -162,7 +162,7 @@ public class StateWheelExecuteThread extends Thread {
}
private void addEvent(StateEvent stateEvent) {
if (!processInstanceExecMaps.contains(stateEvent.getProcessInstanceId())) {
if (!processInstanceExecMaps.containsKey(stateEvent.getProcessInstanceId())) {
return;
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecMaps.get(stateEvent.getProcessInstanceId());

25
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -669,12 +669,14 @@ public class WorkflowExecuteThread implements Runnable {
if (taskTimeoutCheckList.containsKey(taskInstance.getId())) {
return;
}
TaskDefinition taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion()
);
taskInstance.setTaskDefine(taskDefinition);
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
if (taskInstance.getTaskDefine() == null) {
TaskDefinition taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion()
);
taskInstance.setTaskDefine(taskDefinition);
}
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
@ -686,12 +688,13 @@ public class WorkflowExecuteThread implements Runnable {
if (taskRetryCheckList.containsKey(taskInstance.getId())) {
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
return;
if (taskInstance.getTaskDefine() == null) {
TaskDefinition taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion()
);
taskInstance.setTaskDefine(taskDefinition);
}
if (taskInstance.taskCanRetry()) {
this.taskRetryCheckList.put(taskInstance.getId(), taskInstance);
}

Loading…
Cancel
Save