Browse Source

[improve-#13045] after a submit failure, stop the processInstance to avoid an endless loop (#13051)

* [improve-#13045] add max submit number of workflow

* [fix-13405] remove max times

* Update dolphinscheduler-master/src/main/resources/application.yaml

Co-authored-by: Wenjun Ruan <wenjun@apache.org>

---------

Co-authored-by: fuchanghai <‘2875334588@qq.com’>
Co-authored-by: Wenjun Ruan <wenjun@apache.org>
3.2.0-release
fuchanghai 2 years ago committed by GitHub
parent
commit
701d67c831
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
  2. 15
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
  3. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java

@ -27,7 +27,8 @@ public enum StateEventType {
TASK_TIMEOUT(3, "task timeout"), TASK_TIMEOUT(3, "task timeout"),
WAKE_UP_TASK_GROUP(4, "wait task group"), WAKE_UP_TASK_GROUP(4, "wait task group"),
TASK_RETRY(5, "task retry"), TASK_RETRY(5, "task retry"),
PROCESS_BLOCKED(6, "process blocked"); PROCESS_BLOCKED(6, "process blocked"),
PROCESS_SUBMIT_FAILED(7, "process submit failed");
StateEventType(int code, String descp) { StateEventType(int code, String descp) {
this.code = code; this.code = code;

15
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.event; package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
@ -66,11 +68,16 @@ public class WorkflowStartEventHandler implements WorkflowEventHandler {
if (processInstance.getTimeout() > 0) { if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
} }
} else { } else if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
// submit failed will resend the event to workflow event queue log.error(
log.error("Failed to submit the workflow instance, will resend the workflow start event: {}", "Failed to submit the workflow instance, will resend the workflow start event: {}",
workflowEvent); workflowEvent);
workflowEventQueue.addEvent(workflowEvent); WorkflowStateEvent stateEvent = WorkflowStateEvent.builder()
.processInstanceId(processInstance.getId())
.type(StateEventType.PROCESS_SUBMIT_FAILED)
.status(WorkflowExecutionStatus.FAILURE)
.build();
workflowExecuteRunnable.addStateEvent(stateEvent);
} }
}); });
} }

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java

@ -57,6 +57,9 @@ public class WorkflowStateEventHandler implements StateEventHandler {
return true; return true;
} }
if (workflowStateEvent.getStatus().isFinished()) { if (workflowStateEvent.getStatus().isFinished()) {
if (workflowStateEvent.getType().equals(StateEventType.PROCESS_SUBMIT_FAILED)) {
workflowExecuteRunnable.updateProcessInstanceState(workflowStateEvent);
}
workflowExecuteRunnable.endProcess(); workflowExecuteRunnable.endProcess();
} }
if (processInstance.getState().isReadyStop()) { if (processInstance.getState().isReadyStop()) {

Loading…
Cancel
Save