@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread ;
import org.apache.dolphinscheduler.common.thread.Stopper ;
import org.apache.dolphinscheduler.common.utils.DateUtils ;
import org.apache.dolphinscheduler.common.utils.LoggerUtils ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition ;
import org.apache.dolphinscheduler.dao.entity.TaskInstance ;
@ -43,6 +44,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Component ;
import lombok.NonNull ;
/ * *
* Check thread
* 1 . timeout task check
@ -110,10 +113,16 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
public void addProcess4TimeoutCheck ( ProcessInstance processInstance ) {
processInstanceTimeoutCheckList . add ( processInstance . getId ( ) ) ;
logger . info ( "Success add workflow instance into timeout check list" ) ;
}
public void removeProcess4TimeoutCheck ( ProcessInstance processInstance ) {
processInstanceTimeoutCheckList . remove ( processInstance . getId ( ) ) ;
boolean removeFlag = processInstanceTimeoutCheckList . remove ( processInstance . getId ( ) ) ;
if ( removeFlag ) {
logger . info ( "Success remove workflow instance from timeout check list" ) ;
} else {
logger . warn ( "Failed to remove workflow instance from timeout check list" ) ;
}
}
private void checkProcess4Timeout ( ) {
@ -121,106 +130,95 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
return ;
}
for ( Integer processInstanceId : processInstanceTimeoutCheckList ) {
if ( processInstanceId = = null ) {
continue ;
}
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager . getByProcessInstanceId ( processInstanceId ) ;
if ( workflowExecuteThread = = null ) {
logger . warn ( "can not find workflowExecuteThread, this check event will remove, processInstanceId:{}" , processInstanceId ) ;
logger . warn ( "Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list" ) ;
processInstanceTimeoutCheckList . remove ( processInstanceId ) ;
continue ;
}
ProcessInstance processInstance = workflowExecuteThread . getProcessInstance ( ) ;
if ( processInstance = = null ) {
logger . warn ( "Check workflow timeout failed, the workflowInstance is null" ) ;
continue ;
}
long timeRemain = DateUtils . getRemainTime ( processInstance . getStartTime ( ) , ( long ) processInstance . getTimeout ( ) * Constants . SEC_2_MINUTES_TIME_UNIT ) ;
if ( timeRemain < 0 ) {
logger . info ( "Workflow instance timeout, adding timeout event" ) ;
addProcessTimeoutEvent ( processInstance ) ;
processInstanceTimeoutCheckList . remove ( processInstance . getId ( ) ) ;
logger . info ( "Workflow instance timeout, added timeout event" ) ;
}
}
}
public void addTask4TimeoutCheck ( ProcessInstance processInstance , TaskInstance taskInstance ) {
public void addTask4TimeoutCheck ( @NonNull ProcessInstance processInstance , @NonNull TaskInstance taskInstance ) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey . getTaskInstanceKey ( processInstance , taskInstance ) ;
if ( taskInstanceKey = = null ) {
logger . error ( "taskInstanceKey is null" ) ;
return ;
}
logger . info ( "Adding task instance into timeout check list" ) ;
if ( taskInstanceTimeoutCheckList . contains ( taskInstanceKey ) ) {
logger . warn ( "Task instance is already in timeout check list" ) ;
return ;
}
TaskDefinition taskDefinition = taskInstance . getTaskDefine ( ) ;
if ( taskDefinition = = null ) {
logger . error ( "taskDefinition is null, taskId:{}" , taskInstance . getId ( ) ) ;
logger . error ( "Failed to add task instance into timeout check list, taskDefinition is null" ) ;
return ;
}
if ( TimeoutFlag . OPEN = = taskDefinition . getTimeoutFlag ( ) ) {
taskInstanceTimeoutCheckList . add ( taskInstanceKey ) ;
logger . info ( "Timeout flag is open, added task instance into timeout check list" ) ;
}
if ( taskInstance . isDependTask ( ) | | taskInstance . isSubProcess ( ) ) {
taskInstanceTimeoutCheckList . add ( taskInstanceKey ) ;
logger . info ( "task instance is dependTask orSubProcess, added task instance into timeout check list" ) ;
}
}
public void removeTask4TimeoutCheck ( ProcessInstance processInstance , TaskInstance taskInstance ) {
public void removeTask4TimeoutCheck ( @NonNull ProcessInstance processInstance , @NonNull TaskInstance taskInstance ) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey . getTaskInstanceKey ( processInstance , taskInstance ) ;
if ( taskInstanceKey = = null ) {
logger . error ( "taskInstanceKey is null" ) ;
return ;
}
taskInstanceTimeoutCheckList . remove ( taskInstanceKey ) ;
logger . info ( "remove task instance from timeout check list" ) ;
}
public void addTask4RetryCheck ( ProcessInstance processInstance , TaskInstance taskInstance ) {
public void addTask4RetryCheck ( @NonNull ProcessInstance processInstance , @NonNull TaskInstance taskInstance ) {
logger . info ( "Adding task instance into retry check list" ) ;
TaskInstanceKey taskInstanceKey = TaskInstanceKey . getTaskInstanceKey ( processInstance , taskInstance ) ;
if ( taskInstanceKey = = null ) {
logger . error ( "taskInstanceKey is null" ) ;
return ;
}
if ( taskInstanceRetryCheckList . contains ( taskInstanceKey ) ) {
logger . warn ( "Task instance is already in retry check list" ) ;
return ;
}
TaskDefinition taskDefinition = taskInstance . getTaskDefine ( ) ;
if ( taskDefinition = = null ) {
logger . error ( "taskDefinition is null, taskId:{}" , taskInstance . getId ( ) ) ;
logger . error ( "Add task instance into retry check list error, taskDefinition is null" ) ;
return ;
}
logger . debug ( "addTask4RetryCheck, taskCode:{}, processInstanceId:{}" , taskInstance . getTaskCode ( ) , taskInstance . getProcessInstanceId ( ) ) ;
taskInstanceRetryCheckList . add ( taskInstanceKey ) ;
logger . info ( "[WorkflowInstance-{}][TaskInstance-{}] Added task instance into retry check list" ,
processInstance . getId ( ) , taskInstance . getId ( ) ) ;
}
public void removeTask4RetryCheck ( ProcessInstance processInstance , TaskInstance taskInstance ) {
public void removeTask4RetryCheck ( @NonNull ProcessInstance processInstance , @NonNull TaskInstance taskInstance ) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey . getTaskInstanceKey ( processInstance , taskInstance ) ;
if ( taskInstanceKey = = null ) {
logger . error ( "taskInstanceKey is null" ) ;
return ;
}
taskInstanceRetryCheckList . remove ( taskInstanceKey ) ;
logger . info ( "remove task instance from retry check list" ) ;
}
public void addTask4StateCheck ( ProcessInstance processInstance , TaskInstance taskInstance ) {
public void addTask4StateCheck ( @NonNull ProcessInstance processInstance , @NonNull TaskInstance taskInstance ) {
logger . info ( "Adding task instance into state check list" ) ;
TaskInstanceKey taskInstanceKey = TaskInstanceKey . getTaskInstanceKey ( processInstance , taskInstance ) ;
if ( taskInstanceKey = = null ) {
logger . error ( "taskInstanceKey is null" ) ;
return ;
}
if ( taskInstanceStateCheckList . contains ( taskInstanceKey ) ) {
logger . warn ( "Task instance is already in state check list" ) ;
return ;
}
if ( taskInstance . isDependTask ( ) | | taskInstance . isSubProcess ( ) ) {
taskInstanceStateCheckList . add ( taskInstanceKey ) ;
logger . info ( "Added task instance into state check list" ) ;
}
}
public void removeTask4StateCheck ( ProcessInstance processInstance , TaskInstance taskInstance ) {
public void removeTask4StateCheck ( @NonNull ProcessInstance processInstance , @NonNull TaskInstance taskInstance ) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey . getTaskInstanceKey ( processInstance , taskInstance ) ;
if ( taskInstanceKey = = null ) {
logger . error ( "taskInstanceKey is null" ) ;
return ;
}
taskInstanceStateCheckList . remove ( taskInstanceKey ) ;
logger . info ( "Removed task instance from state check list" ) ;
}
private void checkTask4Timeout ( ) {
@ -228,30 +226,35 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
return ;
}
for ( TaskInstanceKey taskInstanceKey : taskInstanceTimeoutCheckList ) {
int processInstanceId = taskInstanceKey . getProcessInstanceId ( ) ;
long taskCode = taskInstanceKey . getTaskCode ( ) ;
try {
int processInstanceId = taskInstanceKey . getProcessInstanceId ( ) ;
LoggerUtils . setWorkflowInstanceIdMDC ( processInstanceId ) ;
long taskCode = taskInstanceKey . getTaskCode ( ) ;
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager . getByProcessInstanceId ( processInstanceId ) ;
if ( workflowExecuteThread = = null ) {
logger . warn ( "can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}" ,
processInstanceId , taskCode ) ;
taskInstanceTimeoutCheckList . remove ( taskInstanceKey ) ;
continue ;
}
Optional < TaskInstance > taskInstanceOptional = workflowExecuteThread . getActiveTaskInstanceByTaskCode ( taskCode ) ;
if ( ! taskInstanceOptional . isPresent ( ) ) {
logger . warn ( "can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}" ,
processInstanceId , taskCode ) ;
taskInstanceTimeoutCheckList . remove ( taskInstanceKey ) ;
continue ;
}
TaskInstance taskInstance = taskInstanceOptional . get ( ) ;
if ( TimeoutFlag . OPEN = = taskInstance . getTaskDefine ( ) . getTimeoutFlag ( ) ) {
long timeRemain = DateUtils . getRemainTime ( taskInstance . getStartTime ( ) , ( long ) taskInstance . getTaskDefine ( ) . getTimeout ( ) * Constants . SEC_2_MINUTES_TIME_UNIT ) ;
if ( timeRemain < 0 ) {
addTaskTimeoutEvent ( taskInstance ) ;
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager . getByProcessInstanceId ( processInstanceId ) ;
if ( workflowExecuteThread = = null ) {
logger . warn ( "Check task instance timeout failed, can not find workflowExecuteThread from cache manager, will remove this check task" ) ;
taskInstanceTimeoutCheckList . remove ( taskInstanceKey ) ;
continue ;
}
Optional < TaskInstance > taskInstanceOptional = workflowExecuteThread . getActiveTaskInstanceByTaskCode ( taskCode ) ;
if ( ! taskInstanceOptional . isPresent ( ) ) {
logger . warn ( "Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}"
+ "will remove this check task" , taskCode ) ;
taskInstanceTimeoutCheckList . remove ( taskInstanceKey ) ;
continue ;
}
TaskInstance taskInstance = taskInstanceOptional . get ( ) ;
if ( TimeoutFlag . OPEN = = taskInstance . getTaskDefine ( ) . getTimeoutFlag ( ) ) {
long timeRemain = DateUtils . getRemainTime ( taskInstance . getStartTime ( ) , ( long ) taskInstance . getTaskDefine ( ) . getTimeout ( ) * Constants . SEC_2_MINUTES_TIME_UNIT ) ;
if ( timeRemain < 0 ) {
logger . info ( "Task instance is timeout, adding task timeout event and remove the check" ) ;
addTaskTimeoutEvent ( taskInstance ) ;
taskInstanceTimeoutCheckList . remove ( taskInstanceKey ) ;
}
}
} finally {
LoggerUtils . removeWorkflowInstanceIdMDC ( ) ;
}
}
}
@ -264,41 +267,46 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
for ( TaskInstanceKey taskInstanceKey : taskInstanceRetryCheckList ) {
int processInstanceId = taskInstanceKey . getProcessInstanceId ( ) ;
long taskCode = taskInstanceKey . getTaskCode ( ) ;
try {
LoggerUtils . setWorkflowInstanceIdMDC ( processInstanceId ) ;
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager . getByProcessInstanceId ( processInstanceId ) ;
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager . getByProcessInstanceId ( processInstanceId ) ;
if ( workflowExecuteThread = = null ) {
logger . warn ( "can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}" ,
processInstanceId , taskCode ) ;
taskInstanceRetryCheckList . remove ( taskInstanceKey ) ;
continue ;
}
if ( workflowExecuteThread = = null ) {
logger . warn ( "Task instance retry check failed, can not find workflowExecuteThread from cache manager, "
+ "will remove this check task" ) ;
taskInstanceRetryCheckList . remove ( taskInstanceKey ) ;
continue ;
}
Optional < TaskInstance > taskInstanceOptional = workflowExecuteThread . getRetryTaskInstanceByTaskCode ( taskCode ) ;
ProcessInstance processInstance = workflowExecuteThread . getProcessInstance ( ) ;
Optional < TaskInstance > taskInstanceOptional = workflowExecuteThread . getRetryTaskInstanceByTaskCode ( taskCode ) ;
ProcessInstance processInstance = workflowExecuteThread . getProcessInstance ( ) ;
if ( processInstance . getState ( ) = = ExecutionStatus . READY_STOP ) {
addProcessStopEvent ( processInstance ) ;
taskInstanceRetryCheckList . remove ( taskInstanceKey ) ;
break ;
}
if ( processInstance . getState ( ) = = ExecutionStatus . READY_STOP ) {
logger . warn ( "The process instance is ready to stop, will send process stop event and remove the check task" ) ;
addProcessStopEvent ( processInstance ) ;
taskInstanceRetryCheckList . remove ( taskInstanceKey ) ;
break ;
}
if ( ! taskInstanceOptional . isPresent ( ) ) {
logger . warn ( "can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}" ,
processInstanceId , taskCode ) ;
taskInstanceRetryCheckList . remove ( taskInstanceKey ) ;
continue ;
}
if ( ! taskInstanceOptional . isPresent ( ) ) {
logger . warn ( "Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check" ) ;
taskInstanceRetryCheckList . remove ( taskInstanceKey ) ;
continue ;
}
TaskInstance taskInstance = taskInstanceOptional . get ( ) ;
if ( taskInstance . retryTaskIntervalOverTime ( ) ) {
// reset taskInstance endTime and state
// todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
taskInstance . setEndTime ( null ) ;
taskInstance . setState ( ExecutionStatus . SUBMITTED_SUCCESS ) ;
TaskInstance taskInstance = taskInstanceOptional . get ( ) ;
if ( taskInstance . retryTaskIntervalOverTime ( ) ) {
// reset taskInstance endTime and state
// todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
taskInstance . setEndTime ( null ) ;
taskInstance . setState ( ExecutionStatus . SUBMITTED_SUCCESS ) ;
addTaskRetryEvent ( taskInstance ) ;
taskInstanceRetryCheckList . remove ( taskInstanceKey ) ;
addTaskRetryEvent ( taskInstance ) ;
taskInstanceRetryCheckList . remove ( taskInstanceKey ) ;
}
} finally {
LoggerUtils . removeWorkflowInstanceIdMDC ( ) ;
}
}
}
@ -311,25 +319,29 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
int processInstanceId = taskInstanceKey . getProcessInstanceId ( ) ;
long taskCode = taskInstanceKey . getTaskCode ( ) ;
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager . getByProcessInstanceId ( processInstanceId ) ;
if ( workflowExecuteThread = = null ) {
logger . warn ( "can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}" ,
processInstanceId , taskCode ) ;
taskInstanceStateCheckList . remove ( taskInstanceKey ) ;
continue ;
}
Optional < TaskInstance > taskInstanceOptional = workflowExecuteThread . getActiveTaskInstanceByTaskCode ( taskCode ) ;
if ( ! taskInstanceOptional . isPresent ( ) ) {
logger . warn ( "can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}" ,
processInstanceId , taskCode ) ;
taskInstanceStateCheckList . remove ( taskInstanceKey ) ;
continue ;
}
TaskInstance taskInstance = taskInstanceOptional . get ( ) ;
if ( taskInstance . getState ( ) . typeIsFinished ( ) ) {
continue ;
try {
LoggerUtils . setTaskInstanceIdMDC ( processInstanceId ) ;
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager . getByProcessInstanceId ( processInstanceId ) ;
if ( workflowExecuteThread = = null ) {
logger . warn ( "Task instance state check failed, can not find workflowExecuteThread from cache manager, will remove this check task" ) ;
taskInstanceStateCheckList . remove ( taskInstanceKey ) ;
continue ;
}
Optional < TaskInstance > taskInstanceOptional = workflowExecuteThread . getActiveTaskInstanceByTaskCode ( taskCode ) ;
if ( ! taskInstanceOptional . isPresent ( ) ) {
logger . warn (
"Task instance state check failed, can not find taskInstance from workflowExecuteThread, will remove this check event" ) ;
taskInstanceStateCheckList . remove ( taskInstanceKey ) ;
continue ;
}
TaskInstance taskInstance = taskInstanceOptional . get ( ) ;
if ( taskInstance . getState ( ) . typeIsFinished ( ) ) {
continue ;
}
addTaskStateChangeEvent ( taskInstance ) ;
} finally {
LoggerUtils . removeWorkflowInstanceIdMDC ( ) ;
}
addTaskStateChangeEvent ( taskInstance ) ;
}
}