Browse Source

[BUG][Master] process cannot finish and its status is always running (#6886)

* fix 6882: process cannot finish and its status is always running

* fix 6882: process cannot finish and its status is always running
3.0.0/version-upgrade
OS 3 years ago committed by GitHub
parent
commit
653eae2419
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java

@ -118,6 +118,7 @@ public class EventExecuteService extends Thread {
for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) { for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
if (workflowExecuteThread.eventSize() == 0 if (workflowExecuteThread.eventSize() == 0
|| StringUtils.isEmpty(workflowExecuteThread.getKey()) || StringUtils.isEmpty(workflowExecuteThread.getKey())
|| !workflowExecuteThread.isStart()
|| eventHandlerMap.containsKey(workflowExecuteThread.getKey())) { || eventHandlerMap.containsKey(workflowExecuteThread.getKey())) {
continue; continue;
} }
@ -186,12 +187,13 @@ public class EventExecuteService extends Thread {
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstanceId, 0, workflowExecuteThread.getProcessInstance().getState(), processInstance.getId(), taskInstance.getId() processInstanceId, 0, workflowExecuteThread.getProcessInstance().getState(), processInstance.getId(), taskInstance.getId()
); );
stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
} }
@Override @Override
public void onFailure(Throwable throwable) { public void onFailure(Throwable throwable) {
logger.info("handle events {} failed.", processInstanceId);
logger.info("handle events failed.", throwable);
} }
}; };
Futures.addCallback(future, futureCallback, this.listeningExecutorService); Futures.addCallback(future, futureCallback, this.listeningExecutorService);

Loading…
Cancel
Save