|
|
@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; |
|
|
|
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread; |
|
|
|
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread; |
|
|
|
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; |
|
|
|
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; |
|
|
|
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; |
|
|
|
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; |
|
|
|
import org.apache.dolphinscheduler.server.master.runner.WorkflowSubmitStatue; |
|
|
|
import org.apache.dolphinscheduler.server.master.runner.WorkflowSubmitStatus; |
|
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.CompletableFuture; |
|
|
|
import java.util.concurrent.CompletableFuture; |
|
|
|
|
|
|
|
|
|
|
@ -63,13 +63,13 @@ public class WorkflowStartEventHandler implements WorkflowEventHandler { |
|
|
|
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("submit", |
|
|
|
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("submit", |
|
|
|
processInstance.getProcessDefinitionCode().toString()); |
|
|
|
processInstance.getProcessDefinitionCode().toString()); |
|
|
|
CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool) |
|
|
|
CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool) |
|
|
|
.thenAccept(workflowSubmitStatue -> { |
|
|
|
.thenAccept(workflowSubmitStatus -> { |
|
|
|
if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) { |
|
|
|
if (WorkflowSubmitStatus.SUCCESS == workflowSubmitStatus) { |
|
|
|
log.info("Success submit the workflow instance"); |
|
|
|
log.info("Success submit the workflow instance"); |
|
|
|
if (processInstance.getTimeout() > 0) { |
|
|
|
if (processInstance.getTimeout() > 0) { |
|
|
|
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); |
|
|
|
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); |
|
|
|
} |
|
|
|
} |
|
|
|
} else if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) { |
|
|
|
} else if (WorkflowSubmitStatus.FAILED == workflowSubmitStatus) { |
|
|
|
log.error( |
|
|
|
log.error( |
|
|
|
"Failed to submit the workflow instance, will resend the workflow start event: {}", |
|
|
|
"Failed to submit the workflow instance, will resend the workflow start event: {}", |
|
|
|
workflowEvent); |
|
|
|
workflowEvent); |
|
|
|