@ -48,6 +48,9 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
import com.google.common.base.Strings ;
/ * *
* Used to execute { @link WorkflowExecuteRunnable } , when
* /
@Component
public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@ -71,7 +74,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/ * *
* multi - thread filter , avoid handling workflow at the same time
* /
private ConcurrentHashMap < String , WorkflowExecuteRunnable > multiThreadFilterMap = new ConcurrentHashMap ( ) ;
private ConcurrentHashMap < String , WorkflowExecuteRunnable > multiThreadFilterMap = new ConcurrentHashMap < > ( ) ;
@PostConstruct
private void init ( ) {
@ -94,7 +97,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
}
/ * *
* start workflow
* Start the given workflow .
* /
public void startWorkflow ( WorkflowExecuteRunnable workflowExecuteThread ) {
ProcessInstanceMetrics . incProcessInstanceSubmit ( ) ;
@ -102,13 +105,14 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
}
/ * *
* execute workflow
* Handle the events belong to the given workflow .
* /
public void executeEvent ( WorkflowExecuteRunnable workflowExecuteThread ) {
if ( ! workflowExecuteThread . isStart ( ) | | workflowExecuteThread . eventSize ( ) = = 0 ) {
return ;
}
if ( multiThreadFilterMap . containsKey ( workflowExecuteThread . getKey ( ) ) ) {
logger . warn ( "The workflow:{} has been executed by another thread" , workflowExecuteThread . getKey ( ) ) ;
return ;
}
multiThreadFilterMap . put ( workflowExecuteThread . getKey ( ) , workflowExecuteThread ) ;
@ -123,8 +127,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@Override
public void onSuccess ( Object result ) {
// if an exception occurs, first, the error message cannot be printed in the log;
// secondly, the `multiThreadFilterMap` cannot remove the `workflowExecuteThread`, resulting in the state of process instance cannot be changed and memory leak
try {
if ( workflowExecuteThread . workFlowFinish ( ) ) {
stateWheelExecuteThread . removeProcess4TimeoutCheck ( workflowExecuteThread . getProcessInstance ( ) ) ;
@ -134,9 +136,11 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
}
} catch ( Exception e ) {
logger . error ( "handle events {} success, but notify changed error" , processInstanceId , e ) ;
}
} finally {
// make sure the process has been removed from multiThreadFilterMap
multiThreadFilterMap . remove ( workflowExecuteThread . getKey ( ) ) ;
}
}
} ) ;
}