Browse Source

[BUG-6543][Master] process instance state is always running when failure task (#6547)

* fix-6543: process instance state is always running when failure task exists

* code style

* code style
3.0.0/version-upgrade
OS 3 years ago committed by GitHub
parent
commit
52a550b6ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  2. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
  3. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  4. 56
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  5. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  6. 8
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

23
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.dao.entity; package org.apache.dolphinscheduler.dao.entity;
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@ -25,6 +27,7 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable; import java.io.Serializable;
@ -591,6 +594,26 @@ public class TaskInstance implements Serializable {
} }
} }
/**
* whether the retry interval is timed out
*
* @return Boolean
*/
public boolean retryTaskIntervalOverTime() {
if (getState() != ExecutionStatus.FAILURE) {
return true;
}
if (getId() == 0
|| getMaxRetryTimes() == 0
|| getRetryInterval() == 0) {
return true;
}
Date now = new Date();
long failedTimeInterval = DateUtils.differSec(now, getEndTime());
// task retry does not over time, return false
return getRetryInterval() * SEC_2_MINUTES_TIME_UNIT < failedTimeInterval;
}
public Priority getTaskInstancePriority() { public Priority getTaskInstancePriority() {
return taskInstancePriority; return taskInstancePriority;
} }

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java

@ -122,7 +122,7 @@ public class EventExecuteService extends Thread {
continue; continue;
} }
int processInstanceId = workflowExecuteThread.getProcessInstance().getId(); int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
logger.info("handle process instance : {} events, count:{}", logger.info("handle process instance : {} , events count:{}",
processInstanceId, processInstanceId,
workflowExecuteThread.eventSize()); workflowExecuteThread.eventSize());
logger.info("already exists handler process size:{}", this.eventHandlerMap.size()); logger.info("already exists handler process size:{}", this.eventHandlerMap.size());

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -96,6 +96,10 @@ public class StateWheelExecuteThread extends Thread {
return; return;
} }
} }
if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) {
processDependCheck(taskInstance);
taskInstanceCheckList.remove(taskInstance.getId());
}
if (taskInstance.isSubProcess() || taskInstance.isDependTask()) { if (taskInstance.isSubProcess() || taskInstance.isDependTask()) {
processDependCheck(taskInstance); processDependCheck(taskInstance);
} }

56
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -349,7 +349,7 @@ public class WorkflowExecuteThread implements Runnable {
private boolean taskStateChangeHandler(StateEvent stateEvent) { private boolean taskStateChangeHandler(StateEvent stateEvent) {
TaskInstance task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); TaskInstance task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
if (stateEvent.getExecutionStatus().typeIsFinished()) { if (task.getState().typeIsFinished()) {
taskFinished(task); taskFinished(task);
} else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) { } else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
@ -372,6 +372,18 @@ public class WorkflowExecuteThread implements Runnable {
task.getState()); task.getState());
if (task.taskCanRetry()) { if (task.taskCanRetry()) {
addTaskToStandByList(task); addTaskToStandByList(task);
if (!task.retryTaskIntervalOverTime()) {
logger.info("failure task will be submitted: process id: {}, task instance id: {} state:{} retry times:{} / {}, interval:{}",
processInstance.getId(),
task.getId(),
task.getState(),
task.getRetryTimes(),
task.getMaxRetryTimes(),
task.getRetryInterval());
this.addTimeoutCheck(task);
} else {
submitStandByTask();
}
return; return;
} }
ProcessInstance processInstance = processService.findProcessInstanceById(this.processInstance.getId()); ProcessInstance processInstance = processService.findProcessInstanceById(this.processInstance.getId());
@ -650,20 +662,22 @@ public class WorkflowExecuteThread implements Runnable {
} }
private void addTimeoutCheck(TaskInstance taskInstance) { private void addTimeoutCheck(TaskInstance taskInstance) {
if (taskTimeoutCheckList.containsKey(taskInstance.getId())) {
return;
}
TaskDefinition taskDefinition = processService.findTaskDefinition( TaskDefinition taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion() taskInstance.getTaskDefinitionVersion()
); );
taskInstance.setTaskDefine(taskDefinition); taskInstance.setTaskDefine(taskDefinition);
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) { if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag() || taskInstance.taskCanRetry()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
return; } else {
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
} }
} }
}
/** /**
* find task instance in db. * find task instance in db.
@ -1131,7 +1145,9 @@ public class WorkflowExecuteThread implements Runnable {
private void addTaskToStandByList(TaskInstance taskInstance) { private void addTaskToStandByList(TaskInstance taskInstance) {
logger.info("add task to stand by list: {}", taskInstance.getName()); logger.info("add task to stand by list: {}", taskInstance.getName());
try { try {
if (!readyToSubmitTaskQueue.contains(taskInstance)) {
readyToSubmitTaskQueue.put(taskInstance); readyToSubmitTaskQueue.put(taskInstance);
}
} catch (Exception e) { } catch (Exception e) {
logger.error("add task instance to readyToSubmitTaskQueue error, taskName: {}", taskInstance.getName(), e); logger.error("add task instance to readyToSubmitTaskQueue error, taskName: {}", taskInstance.getName(), e);
} }
@ -1191,36 +1207,12 @@ public class WorkflowExecuteThread implements Runnable {
this.addStateEvent(stateEvent); this.addStateEvent(stateEvent);
} }
} }
} }
public boolean workFlowFinish() { public boolean workFlowFinish() {
return this.processInstance.getState().typeIsFinished(); return this.processInstance.getState().typeIsFinished();
} }
/**
* whether the retry interval is timed out
*
* @param taskInstance task instance
* @return Boolean
*/
private boolean retryTaskIntervalOverTime(TaskInstance taskInstance) {
if (taskInstance.getState() != ExecutionStatus.FAILURE) {
return true;
}
if (taskInstance.getId() == 0
||
taskInstance.getMaxRetryTimes() == 0
||
taskInstance.getRetryInterval() == 0) {
return true;
}
Date now = new Date();
long failedTimeInterval = DateUtils.differSec(now, taskInstance.getEndTime());
// task retry does not over time, return false
return taskInstance.getRetryInterval() * SEC_2_MINUTES_TIME_UNIT < failedTimeInterval;
}
/** /**
* handling the list of tasks to be submitted * handling the list of tasks to be submitted
*/ */
@ -1252,12 +1244,16 @@ public class WorkflowExecuteThread implements Runnable {
} }
DependResult dependResult = getDependResultForTask(task); DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) { if (DependResult.SUCCESS == dependResult) {
if (retryTaskIntervalOverTime(task)) { if (task.retryTaskIntervalOverTime()) {
int originalId = task.getId();
TaskInstance taskInstance = submitTaskExec(task); TaskInstance taskInstance = submitTaskExec(task);
if (taskInstance == null) { if (taskInstance == null) {
this.taskFailedSubmit = true; this.taskFailedSubmit = true;
} else { } else {
removeTaskFromStandbyList(task); removeTaskFromStandbyList(task);
if (taskInstance.getId() != originalId) {
activeTaskProcessorMaps.remove(originalId);
}
} }
} }
} else if (DependResult.FAILED == dependResult) { } else if (DependResult.FAILED == dependResult) {

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1334,6 +1334,8 @@ public class ProcessService {
taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1); taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
} }
taskInstance.setSubmitTime(null); taskInstance.setSubmitTime(null);
taskInstance.setLogPath(null);
taskInstance.setExecutePath(null);
taskInstance.setStartTime(null); taskInstance.setStartTime(null);
taskInstance.setEndTime(null); taskInstance.setEndTime(null);
taskInstance.setFlag(Flag.YES); taskInstance.setFlag(Flag.YES);

8
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

@ -111,15 +111,15 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
* @return true is contains * @return true is contains
*/ */
public boolean contains(TaskInstance taskInstance) { public boolean contains(TaskInstance taskInstance) {
return queue.contains(taskInstance); return this.contains(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
} }
public boolean contains(int taskId) { public boolean contains(long taskCode, int taskVersion) {
Iterator<TaskInstance> iterator = this.queue.iterator(); Iterator<TaskInstance> iterator = this.queue.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
TaskInstance taskInstance = iterator.next(); TaskInstance taskInstance = iterator.next();
if (taskId == taskInstance.getId()) { if (taskCode == taskInstance.getTaskCode()
&& taskVersion == taskInstance.getTaskDefinitionVersion()) {
return true; return true;
} }
} }

Loading…
Cancel
Save