Browse Source

[Fix-8337][Master] Process instance can not be kill when task is failure and can be retry (#8387)

* fix bug_8337

* fix bug_8337

* fix bug_8337

* fix bug_8337

* fix bug_8337

* fix bug_8337

* fix bug_8337

* test

* fix dev_bug_8337

* fix dev_bug_8337

* fix dev_bug_8337

* fix bug_8337
3.0.0/version-upgrade
xiangzihao 3 years ago committed by GitHub
parent
commit
08f642c756
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  2. 45
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  3. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  4. 8
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

19
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -250,18 +250,29 @@ public class StateWheelExecuteThread extends Thread {
if (taskInstanceRetryCheckList.isEmpty()) {
return;
}
for (TaskInstanceKey taskInstanceKey : taskInstanceRetryCheckList) {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
addProcessStopEvent(processInstance);
taskInstanceRetryCheckList.remove(taskInstanceKey);
break;
}
if (taskInstance == null) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
@ -320,6 +331,14 @@ public class StateWheelExecuteThread extends Thread {
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
private void addProcessStopEvent(ProcessInstance processInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(ExecutionStatus.STOP);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
private void addTaskRetryEvent(TaskInstance taskInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_RETRY);

45
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -441,7 +441,7 @@ public class WorkflowExecuteThread {
processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance);
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else if (taskInstance.taskCanRetry()) {
} else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
// retry task
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().typeIsFailure()) {
@ -577,6 +577,7 @@ public class WorkflowExecuteThread {
logger.error("task instance id null, state event:{}", stateEvent);
return false;
}
if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) {
logger.error("mismatch task instance id, event:{}", stateEvent);
return false;
@ -636,6 +637,12 @@ public class WorkflowExecuteThread {
private boolean processStateChangeHandler(StateEvent stateEvent) {
try {
logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus());
if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
this.updateProcessInstanceState(stateEvent);
return true;
}
if (processComplementData()) {
return true;
}
@ -1441,10 +1448,6 @@ public class WorkflowExecuteThread {
// active task and retry task exists
return runningState(state);
}
// process failure
if (processFailed()) {
return ExecutionStatus.FAILURE;
}
// waiting thread
if (hasWaitingThreadTask()) {
@ -1460,8 +1463,10 @@ public class WorkflowExecuteThread {
if (state == ExecutionStatus.READY_STOP) {
List<TaskInstance> stopList = getCompleteTaskByState(ExecutionStatus.STOP);
List<TaskInstance> killList = getCompleteTaskByState(ExecutionStatus.KILL);
List<TaskInstance> faillist = getCompleteTaskByState(ExecutionStatus.FAILURE);
if (CollectionUtils.isNotEmpty(stopList)
|| CollectionUtils.isNotEmpty(killList)
|| CollectionUtils.isNotEmpty(faillist)
|| !isComplementEnd()) {
return ExecutionStatus.STOP;
} else {
@ -1469,6 +1474,11 @@ public class WorkflowExecuteThread {
}
}
// process failure
if (processFailed()) {
return ExecutionStatus.FAILURE;
}
// success
if (state == ExecutionStatus.RUNNING_EXECUTION) {
List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
@ -1534,6 +1544,26 @@ public class WorkflowExecuteThread {
}
}
/**
* stateEvent's execution status as process instance state
*/
private void updateProcessInstanceState(StateEvent stateEvent) {
ExecutionStatus state = stateEvent.getExecutionStatus();
if (processInstance.getState() != state) {
logger.info(
"work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
processInstance.getId(), processInstance.getName(),
processInstance.getState(), state,
processInstance.getCommandType());
processInstance.setState(state);
if (state.typeIsFinished()) {
processInstance.setEndTime(new Date());
}
processService.updateProcessInstance(processInstance);
}
}
/**
* get task dependency result
*
@ -1607,6 +1637,11 @@ public class WorkflowExecuteThread {
private void killAllTasks() {
logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
activeTaskProcessorMaps.size());
if (readyToSubmitTaskQueue.size() > 0) {
readyToSubmitTaskQueue.clear();
}
for (long taskCode : activeTaskProcessorMaps.keySet()) {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskCode);
Integer taskInstanceId = validTaskMap.get(taskCode);

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1162,7 +1162,7 @@ public class ProcessService {
logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
Thread.sleep(commitInterval);
} catch (Exception e) {
logger.error("task commit to mysql failed", e);
logger.error("task commit to db failed", e);
}
retryTimes += 1;
}

8
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

@ -104,6 +104,14 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
return queue.size();
}
/**
* clear task
*
*/
public void clear() {
queue.clear();
}
/**
* whether contains the task instance
*

Loading…
Cancel
Save