@ -17,16 +17,10 @@
package org.apache.dolphinscheduler.server.master.runner ;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE ;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES ;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP ;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING ;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR ;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN ;
import com.google.common.collect.Lists ;
import org.apache.commons.collections.CollectionUtils ;
import org.apache.commons.lang3.StringUtils ;
import org.apache.commons.lang3.math.NumberUtils ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.CommandType ;
import org.apache.dolphinscheduler.common.enums.FailureStrategy ;
@ -75,10 +69,8 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.corn.CronUtils ;
import org.apache.dolphinscheduler.service.process.ProcessService ;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue ;
import org.apache.commons.collections.CollectionUtils ;
import org.apache.commons.lang3.StringUtils ;
import org.apache.commons.lang3.math.NumberUtils ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import java.util.ArrayList ;
import java.util.Arrays ;
@ -96,10 +88,18 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue ;
import java.util.concurrent.atomic.AtomicBoolean ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import com.google.common.collect.Lists ;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE ;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST ;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES ;
import static org.apache.dolphinscheduler.common.Constants.COMMA ;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP ;
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS ;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING ;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR ;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN ;
/ * *
* Workflow execute task , used to execute a workflow instance .
@ -282,14 +282,14 @@ public class WorkflowExecuteRunnable implements Runnable {
public String getKey ( ) {
if ( StringUtils . isNotEmpty ( key )
| | this . processDefinition = = null ) {
| | this . processDefinition = = null ) {
return key ;
}
key = String . format ( "%d_%d_%d" ,
this . processDefinition . getCode ( ) ,
this . processDefinition . getVersion ( ) ,
this . processInstance . getId ( ) ) ;
this . processDefinition . getCode ( ) ,
this . processDefinition . getVersion ( ) ,
this . processInstance . getId ( ) ) ;
return key ;
}
@ -503,7 +503,7 @@ public class WorkflowExecuteRunnable implements Runnable {
} else {
ProcessInstance processInstance = this . processService . findProcessInstanceById ( nextTaskInstance . getProcessInstanceId ( ) ) ;
this . processService . sendStartTask2Master ( processInstance , nextTaskInstance . getId ( ) ,
org . apache . dolphinscheduler . remote . command . CommandType . TASK_WAKEUP_EVENT_REQUEST ) ;
org . apache . dolphinscheduler . remote . command . CommandType . TASK_WAKEUP_EVENT_REQUEST ) ;
}
}
}
@ -526,12 +526,12 @@ public class WorkflowExecuteRunnable implements Runnable {
waitToRetryTaskInstanceMap . put ( newTaskInstance . getTaskCode ( ) , newTaskInstance ) ;
if ( ! taskInstance . retryTaskIntervalOverTime ( ) ) {
logger . info ( "failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}" ,
processInstance . getId ( ) ,
newTaskInstance . getTaskCode ( ) ,
newTaskInstance . getState ( ) ,
newTaskInstance . getRetryTimes ( ) ,
newTaskInstance . getMaxRetryTimes ( ) ,
newTaskInstance . getRetryInterval ( ) ) ;
processInstance . getId ( ) ,
newTaskInstance . getTaskCode ( ) ,
newTaskInstance . getState ( ) ,
newTaskInstance . getRetryTimes ( ) ,
newTaskInstance . getMaxRetryTimes ( ) ,
newTaskInstance . getRetryInterval ( ) ) ;
stateWheelExecuteThread . addTask4TimeoutCheck ( processInstance , newTaskInstance ) ;
stateWheelExecuteThread . addTask4RetryCheck ( processInstance , newTaskInstance ) ;
} else {
@ -562,7 +562,7 @@ public class WorkflowExecuteRunnable implements Runnable {
logger . info ( "process instance update: {}" , processInstanceId ) ;
processInstance = processService . findProcessInstanceById ( processInstanceId ) ;
processDefinition = processService . findProcessDefinition ( processInstance . getProcessDefinitionCode ( ) ,
processInstance . getProcessDefinitionVersion ( ) ) ;
processInstance . getProcessDefinitionVersion ( ) ) ;
processInstance . setProcessDefinition ( processDefinition ) ;
}
@ -591,8 +591,8 @@ public class WorkflowExecuteRunnable implements Runnable {
public boolean checkProcessInstance ( StateEvent stateEvent ) {
if ( this . processInstance . getId ( ) ! = stateEvent . getProcessInstanceId ( ) ) {
logger . error ( "mismatch process instance id: {}, state event:{}" ,
this . processInstance . getId ( ) ,
stateEvent ) ;
this . processInstance . getId ( ) ,
stateEvent ) ;
return false ;
}
return true ;
@ -774,7 +774,12 @@ public class WorkflowExecuteRunnable implements Runnable {
if ( cmdParam . containsKey ( Constants . CMD_PARAM_RECOVERY_START_NODE_STRING ) ) {
cmdParam . remove ( Constants . CMD_PARAM_RECOVERY_START_NODE_STRING ) ;
}
cmdParam . replace ( CMDPARAM_COMPLEMENT_DATA_START_DATE , DateUtils . format ( scheduleDate , "yyyy-MM-dd HH:mm:ss" , null ) ) ;
if ( cmdParam . containsKey ( CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST ) ) {
cmdParam . replace ( CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST , cmdParam . get ( CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST ) . substring ( cmdParam . get ( CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST ) . indexOf ( COMMA ) + 1 ) ) ;
}
if ( cmdParam . containsKey ( CMDPARAM_COMPLEMENT_DATA_START_DATE ) ) {
cmdParam . replace ( CMDPARAM_COMPLEMENT_DATA_START_DATE , DateUtils . format ( scheduleDate , YYYY_MM_DD_HH_MM_SS , null ) ) ;
}
command . setCommandParam ( JSONUtils . toJsonString ( cmdParam ) ) ;
command . setTaskDependType ( processInstance . getTaskDependType ( ) ) ;
command . setFailureStrategy ( processInstance . getFailureStrategy ( ) ) ;
@ -794,7 +799,7 @@ public class WorkflowExecuteRunnable implements Runnable {
private boolean needComplementProcess ( ) {
if ( processInstance . isComplementData ( )
& & Flag . NO = = processInstance . getIsSubProcess ( ) ) {
& & Flag . NO = = processInstance . getIsSubProcess ( ) ) {
return true ;
}
return false ;
@ -874,7 +879,7 @@ public class WorkflowExecuteRunnable implements Runnable {
return ;
}
processDefinition = processService . findProcessDefinition ( processInstance . getProcessDefinitionCode ( ) ,
processInstance . getProcessDefinitionVersion ( ) ) ;
processInstance . getProcessDefinitionVersion ( ) ) ;
processInstance . setProcessDefinition ( processDefinition ) ;
List < TaskInstance > recoverNodeList = getStartTaskInstanceList ( processInstance . getCommandParam ( ) ) ;
@ -894,7 +899,7 @@ public class WorkflowExecuteRunnable implements Runnable {
List < String > recoveryNodeCodeList = getRecoveryNodeCodeList ( recoverNodeList ) ;
List < String > startNodeNameList = parseStartNodeName ( processInstance . getCommandParam ( ) ) ;
ProcessDag processDag = generateFlowDag ( taskNodeList ,
startNodeNameList , recoveryNodeCodeList , processInstance . getTaskDependType ( ) ) ;
startNodeNameList , recoveryNodeCodeList , processInstance . getTaskDependType ( ) ) ;
if ( processDag = = null ) {
logger . error ( "processDag is null" ) ;
return ;
@ -956,15 +961,24 @@ public class WorkflowExecuteRunnable implements Runnable {
if ( processInstance . isComplementData ( ) & & complementListDate . isEmpty ( ) ) {
Map < String , String > cmdParam = JSONUtils . toMap ( processInstance . getCommandParam ( ) ) ;
if ( cmdParam ! = null & & cmdParam . containsKey ( CMDPARAM_COMPLEMENT_DATA_START_DATE ) ) {
if ( cmdParam ! = null ) {
// reset global params while there are start parameters
setGlobalParamIfCommanded ( processDefinition , cmdParam ) ;
Date start = DateUtils . stringToDate ( cmdParam . get ( CMDPARAM_COMPLEMENT_DATA_START_DATE ) ) ;
Date end = DateUtils . stringToDate ( cmdParam . get ( CMDPARAM_COMPLEMENT_DATA_END_DATE ) ) ;
Date start = null ;
Date end = null ;
if ( cmdParam . containsKey ( CMDPARAM_COMPLEMENT_DATA_START_DATE ) & & cmdParam . containsKey ( CMDPARAM_COMPLEMENT_DATA_END_DATE ) ) {
start = DateUtils . stringToDate ( cmdParam . get ( CMDPARAM_COMPLEMENT_DATA_START_DATE ) ) ;
end = DateUtils . stringToDate ( cmdParam . get ( CMDPARAM_COMPLEMENT_DATA_END_DATE ) ) ;
}
List < Schedule > schedules = processService . queryReleaseSchedulerListByProcessDefinitionCode ( processInstance . getProcessDefinitionCode ( ) ) ;
if ( complementListDate . isEmpty ( ) & & needComplementProcess ( ) ) {
complementListDate = CronUtils . getSelfFireDateList ( start , end , schedules ) ;
if ( start ! = null & & end ! = null ) {
complementListDate = CronUtils . getSelfFireDateList ( start , end , schedules ) ;
}
if ( cmdParam . containsKey ( CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST ) ) {
complementListDate = CronUtils . getSelfScheduleDateList ( cmdParam ) ;
}
logger . info ( " process definition code:{} complement data: {}" ,
processInstance . getProcessDefinitionCode ( ) , complementListDate ) ;
@ -996,15 +1010,15 @@ public class WorkflowExecuteRunnable implements Runnable {
taskProcessor . init ( taskInstance , processInstance ) ;
if ( taskInstance . getState ( ) = = ExecutionStatus . RUNNING_EXECUTION
& & taskProcessor . getType ( ) . equalsIgnoreCase ( Constants . COMMON_TASK_TYPE ) ) {
& & taskProcessor . getType ( ) . equalsIgnoreCase ( Constants . COMMON_TASK_TYPE ) ) {
notifyProcessHostUpdate ( taskInstance ) ;
}
boolean submit = taskProcessor . action ( TaskAction . SUBMIT ) ;
if ( ! submit ) {
logger . error ( "process id:{} name:{} submit standby task id:{} name:{} failed!" ,
processInstance . getId ( ) , processInstance . getName ( ) ,
taskInstance . getId ( ) , taskInstance . getName ( ) ) ;
processInstance . getId ( ) , processInstance . getName ( ) ,
taskInstance . getId ( ) , taskInstance . getName ( ) ) ;
return null ;
}
@ -1475,10 +1489,10 @@ public class WorkflowExecuteRunnable implements Runnable {
* /
private ExecutionStatus runningState ( ExecutionStatus state ) {
if ( state = = ExecutionStatus . READY_STOP
| | state = = ExecutionStatus . READY_PAUSE
| | state = = ExecutionStatus . WAITING_THREAD
| | state = = ExecutionStatus . READY_BLOCK
| | state = = ExecutionStatus . DELAY_EXECUTION ) {
| | state = = ExecutionStatus . READY_PAUSE
| | state = = ExecutionStatus . WAITING_THREAD
| | state = = ExecutionStatus . READY_BLOCK
| | state = = ExecutionStatus . DELAY_EXECUTION ) {
// if the running task is not completed, the state remains unchanged
return state ;
} else {
@ -1514,8 +1528,8 @@ public class WorkflowExecuteRunnable implements Runnable {
}
if ( processInstance . getFailureStrategy ( ) = = FailureStrategy . CONTINUE ) {
return readyToSubmitTaskQueue . size ( ) = = 0
& & activeTaskProcessorMaps . size ( ) = = 0
& & waitToRetryTaskInstanceMap . size ( ) = = 0 ;
& & activeTaskProcessorMaps . size ( ) = = 0
& & waitToRetryTaskInstanceMap . size ( ) = = 0 ;
}
}
return false ;
@ -1546,9 +1560,9 @@ public class WorkflowExecuteRunnable implements Runnable {
List < TaskInstance > pauseList = getCompleteTaskByState ( ExecutionStatus . PAUSE ) ;
if ( CollectionUtils . isNotEmpty ( pauseList )
| | processInstance . isBlocked ( )
| | ! isComplementEnd ( )
| | readyToSubmitTaskQueue . size ( ) > 0 ) {
| | processInstance . isBlocked ( )
| | ! isComplementEnd ( )
| | readyToSubmitTaskQueue . size ( ) > 0 ) {
return ExecutionStatus . PAUSE ;
} else {
return ExecutionStatus . SUCCESS ;
@ -1613,9 +1627,9 @@ public class WorkflowExecuteRunnable implements Runnable {
List < TaskInstance > killList = getCompleteTaskByState ( ExecutionStatus . KILL ) ;
List < TaskInstance > failList = getCompleteTaskByState ( ExecutionStatus . FAILURE ) ;
if ( CollectionUtils . isNotEmpty ( stopList )
| | CollectionUtils . isNotEmpty ( killList )
| | CollectionUtils . isNotEmpty ( failList )
| | ! isComplementEnd ( ) ) {
| | CollectionUtils . isNotEmpty ( killList )
| | CollectionUtils . isNotEmpty ( failList )
| | ! isComplementEnd ( ) ) {
return ExecutionStatus . STOP ;
} else {
return ExecutionStatus . SUCCESS ;
@ -1673,10 +1687,10 @@ public class WorkflowExecuteRunnable implements Runnable {
ExecutionStatus state = getProcessInstanceState ( processInstance ) ;
if ( processInstance . getState ( ) ! = state ) {
logger . info (
"work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}" ,
processInstance . getId ( ) , processInstance . getName ( ) ,
processInstance . getState ( ) , state ,
processInstance . getCommandType ( ) ) ;
"work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}" ,
processInstance . getId ( ) , processInstance . getName ( ) ,
processInstance . getState ( ) , state ,
processInstance . getCommandType ( ) ) ;
processInstance . setState ( state ) ;
if ( state . typeIsFinished ( ) ) {
@ -1701,10 +1715,10 @@ public class WorkflowExecuteRunnable implements Runnable {
ExecutionStatus state = stateEvent . getExecutionStatus ( ) ;
if ( processInstance . getState ( ) ! = state ) {
logger . info (
"work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}" ,
processInstance . getId ( ) , processInstance . getName ( ) ,
processInstance . getState ( ) , state ,
processInstance . getCommandType ( ) ) ;
"work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}" ,
processInstance . getId ( ) , processInstance . getName ( ) ,
processInstance . getState ( ) , state ,
processInstance . getCommandType ( ) ) ;
processInstance . setState ( state ) ;
if ( state . typeIsFinished ( ) ) {
@ -1751,14 +1765,14 @@ public class WorkflowExecuteRunnable implements Runnable {
* /
private void removeTaskFromStandbyList ( TaskInstance taskInstance ) {
logger . info ( "remove task from stand by list, id: {} name:{}" ,
taskInstance . getId ( ) ,
taskInstance . getName ( ) ) ;
taskInstance . getId ( ) ,
taskInstance . getName ( ) ) ;
try {
readyToSubmitTaskQueue . remove ( taskInstance ) ;
} catch ( Exception e ) {
logger . error ( "remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}" ,
taskInstance . getId ( ) ,
taskInstance . getName ( ) , e ) ;
taskInstance . getId ( ) ,
taskInstance . getName ( ) , e ) ;
}
}
@ -1781,7 +1795,7 @@ public class WorkflowExecuteRunnable implements Runnable {
* /
private void killAllTasks ( ) {
logger . info ( "kill called on process instance id: {}, num: {}" , processInstance . getId ( ) ,
activeTaskProcessorMaps . size ( ) ) ;
activeTaskProcessorMaps . size ( ) ) ;
if ( readyToSubmitTaskQueue . size ( ) > 0 ) {
readyToSubmitTaskQueue . clear ( ) ;