@ -28,14 +28,14 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.thread.Stopper ;
import org.apache.dolphinscheduler.common.thread.Stopper ;
import org.apache.dolphinscheduler.common.thread.ThreadUtils ;
import org.apache.dolphinscheduler.common.thread.ThreadUtils ;
import org.apache.dolphinscheduler.common.utils.* ;
import org.apache.dolphinscheduler.common.utils.* ;
import org.apache.dolphinscheduler.dao.ProcessDao ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.dao.entity.Schedule ;
import org.apache.dolphinscheduler.dao.entity.Schedule ;
import org.apache.dolphinscheduler.dao.entity.TaskInstance ;
import org.apache.dolphinscheduler.dao.entity.TaskInstance ;
import org.apache.dolphinscheduler.dao.utils.DagHelper ;
import org.apache.dolphinscheduler.dao.utils.DagHelper ;
import org.apache.dolphinscheduler.dao.utils.cron.CronUtils ;
import org.apache.dolphinscheduler.server.master.config.MasterConfig ;
import org.apache.dolphinscheduler.server.master.config.MasterConfig ;
import org.apache.dolphinscheduler.server.utils.AlertManager ;
import org.apache.dolphinscheduler.server.utils.AlertManager ;
import org.apache.dolphinscheduler.service.process.ProcessService ;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils ;
import org.slf4j.Logger ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.slf4j.LoggerFactory ;
@ -124,9 +124,9 @@ public class MasterExecThread implements Runnable {
private DAG < String , TaskNode , TaskNodeRelation > dag ;
private DAG < String , TaskNode , TaskNodeRelation > dag ;
/ * *
/ * *
* process dao
* process service
* /
* /
private ProcessDao processDao ;
private ProcessService processService ;
/ * *
/ * *
* master config
* master config
@ -136,10 +136,10 @@ public class MasterExecThread implements Runnable {
/ * *
/ * *
* constructor of MasterExecThread
* constructor of MasterExecThread
* @param processInstance process instance
* @param processInstance process instance
* @param processDao process dao
* @param processService process dao
* /
* /
public MasterExecThread ( ProcessInstance processInstance , ProcessDao processDao ) {
public MasterExecThread ( ProcessInstance processInstance , ProcessService processService ) {
this . processDao = processDao ;
this . processService = processService ;
this . processInstance = processInstance ;
this . processInstance = processInstance ;
this . masterConfig = SpringApplicationContext . getBean ( MasterConfig . class ) ;
this . masterConfig = SpringApplicationContext . getBean ( MasterConfig . class ) ;
@ -177,7 +177,7 @@ public class MasterExecThread implements Runnable {
logger . error ( "process execute failed, process id:{}" , processInstance . getId ( ) ) ;
logger . error ( "process execute failed, process id:{}" , processInstance . getId ( ) ) ;
processInstance . setState ( ExecutionStatus . FAILURE ) ;
processInstance . setState ( ExecutionStatus . FAILURE ) ;
processInstance . setEndTime ( new Date ( ) ) ;
processInstance . setEndTime ( new Date ( ) ) ;
processDao . updateProcessInstance ( processInstance ) ;
processService . updateProcessInstance ( processInstance ) ;
} finally {
} finally {
taskExecService . shutdown ( ) ;
taskExecService . shutdown ( ) ;
// post handle
// post handle
@ -205,11 +205,11 @@ public class MasterExecThread implements Runnable {
Date startDate = DateUtils . getScheduleDate ( cmdParam . get ( CMDPARAM_COMPLEMENT_DATA_START_DATE ) ) ;
Date startDate = DateUtils . getScheduleDate ( cmdParam . get ( CMDPARAM_COMPLEMENT_DATA_START_DATE ) ) ;
Date endDate = DateUtils . getScheduleDate ( cmdParam . get ( CMDPARAM_COMPLEMENT_DATA_END_DATE ) ) ;
Date endDate = DateUtils . getScheduleDate ( cmdParam . get ( CMDPARAM_COMPLEMENT_DATA_END_DATE ) ) ;
processDao . saveProcessInstance ( processInstance ) ;
processService . saveProcessInstance ( processInstance ) ;
// get schedules
// get schedules
int processDefinitionId = processInstance . getProcessDefinitionId ( ) ;
int processDefinitionId = processInstance . getProcessDefinitionId ( ) ;
List < Schedule > schedules = processDao . queryReleaseSchedulerListByProcessDefinitionId ( processDefinitionId ) ;
List < Schedule > schedules = processService . queryReleaseSchedulerListByProcessDefinitionId ( processDefinitionId ) ;
List < Date > listDate = Lists . newLinkedList ( ) ;
List < Date > listDate = Lists . newLinkedList ( ) ;
if ( ! CollectionUtils . isEmpty ( schedules ) ) {
if ( ! CollectionUtils . isEmpty ( schedules ) ) {
for ( Schedule schedule : schedules ) {
for ( Schedule schedule : schedules ) {
@ -223,7 +223,7 @@ public class MasterExecThread implements Runnable {
iterator = listDate . iterator ( ) ;
iterator = listDate . iterator ( ) ;
scheduleDate = iterator . next ( ) ;
scheduleDate = iterator . next ( ) ;
processInstance . setScheduleTime ( scheduleDate ) ;
processInstance . setScheduleTime ( scheduleDate ) ;
processDao . updateProcessInstance ( processInstance ) ;
processService . updateProcessInstance ( processInstance ) ;
} else {
} else {
scheduleDate = processInstance . getScheduleTime ( ) ;
scheduleDate = processInstance . getScheduleTime ( ) ;
if ( scheduleDate = = null ) {
if ( scheduleDate = = null ) {
@ -239,7 +239,7 @@ public class MasterExecThread implements Runnable {
logger . error ( "process {} dag is null, please check out parameters" ,
logger . error ( "process {} dag is null, please check out parameters" ,
processInstance . getId ( ) ) ;
processInstance . getId ( ) ) ;
processInstance . setState ( ExecutionStatus . SUCCESS ) ;
processInstance . setState ( ExecutionStatus . SUCCESS ) ;
processDao . updateProcessInstance ( processInstance ) ;
processService . updateProcessInstance ( processInstance ) ;
return ;
return ;
}
}
@ -281,10 +281,10 @@ public class MasterExecThread implements Runnable {
processInstance . setCommandParam ( JSONUtils . toJson ( cmdParam ) ) ;
processInstance . setCommandParam ( JSONUtils . toJson ( cmdParam ) ) ;
}
}
List < TaskInstance > taskInstanceList = processDao . findValidTaskListByProcessId ( processInstance . getId ( ) ) ;
List < TaskInstance > taskInstanceList = processService . findValidTaskListByProcessId ( processInstance . getId ( ) ) ;
for ( TaskInstance taskInstance : taskInstanceList ) {
for ( TaskInstance taskInstance : taskInstanceList ) {
taskInstance . setFlag ( Flag . NO ) ;
taskInstance . setFlag ( Flag . NO ) ;
processDao . updateTaskInstance ( taskInstance ) ;
processService . updateTaskInstance ( taskInstance ) ;
}
}
processInstance . setState ( ExecutionStatus . RUNNING_EXEUTION ) ;
processInstance . setState ( ExecutionStatus . RUNNING_EXEUTION ) ;
processInstance . setGlobalParams ( ParameterUtils . curingGlobalParams (
processInstance . setGlobalParams ( ParameterUtils . curingGlobalParams (
@ -292,7 +292,7 @@ public class MasterExecThread implements Runnable {
processInstance . getProcessDefinition ( ) . getGlobalParamList ( ) ,
processInstance . getProcessDefinition ( ) . getGlobalParamList ( ) ,
CommandType . COMPLEMENT_DATA , processInstance . getScheduleTime ( ) ) ) ;
CommandType . COMPLEMENT_DATA , processInstance . getScheduleTime ( ) ) ) ;
processDao . saveProcessInstance ( processInstance ) ;
processService . saveProcessInstance ( processInstance ) ;
}
}
// flow end
// flow end
@ -320,11 +320,11 @@ public class MasterExecThread implements Runnable {
* /
* /
private void endProcess ( ) {
private void endProcess ( ) {
processInstance . setEndTime ( new Date ( ) ) ;
processInstance . setEndTime ( new Date ( ) ) ;
processDao . updateProcessInstance ( processInstance ) ;
processService . updateProcessInstance ( processInstance ) ;
if ( processInstance . getState ( ) . typeIsWaittingThread ( ) ) {
if ( processInstance . getState ( ) . typeIsWaittingThread ( ) ) {
processDao . createRecoveryWaitingThreadCommand ( null , processInstance ) ;
processService . createRecoveryWaitingThreadCommand ( null , processInstance ) ;
}
}
List < TaskInstance > taskInstances = processDao . findValidTaskListByProcessId ( processInstance . getId ( ) ) ;
List < TaskInstance > taskInstances = processService . findValidTaskListByProcessId ( processInstance . getId ( ) ) ;
alertManager . sendAlertProcessInstance ( processInstance , taskInstances ) ;
alertManager . sendAlertProcessInstance ( processInstance , taskInstances ) ;
}
}
@ -361,7 +361,7 @@ public class MasterExecThread implements Runnable {
dependFailedTask . clear ( ) ;
dependFailedTask . clear ( ) ;
completeTaskList . clear ( ) ;
completeTaskList . clear ( ) ;
errorTaskList . clear ( ) ;
errorTaskList . clear ( ) ;
List < TaskInstance > taskInstanceList = processDao . findValidTaskListByProcessId ( processInstance . getId ( ) ) ;
List < TaskInstance > taskInstanceList = processService . findValidTaskListByProcessId ( processInstance . getId ( ) ) ;
for ( TaskInstance task : taskInstanceList ) {
for ( TaskInstance task : taskInstanceList ) {
if ( task . isTaskComplete ( ) ) {
if ( task . isTaskComplete ( ) ) {
completeTaskList . put ( task . getName ( ) , task ) ;
completeTaskList . put ( task . getName ( ) , task ) ;
@ -417,7 +417,7 @@ public class MasterExecThread implements Runnable {
* @return TaskInstance
* @return TaskInstance
* /
* /
private TaskInstance findTaskIfExists ( String taskName ) {
private TaskInstance findTaskIfExists ( String taskName ) {
List < TaskInstance > taskInstanceList = processDao . findValidTaskListByProcessId ( this . processInstance . getId ( ) ) ;
List < TaskInstance > taskInstanceList = processService . findValidTaskListByProcessId ( this . processInstance . getId ( ) ) ;
for ( TaskInstance taskInstance : taskInstanceList ) {
for ( TaskInstance taskInstance : taskInstanceList ) {
if ( taskInstance . getName ( ) . equals ( taskName ) ) {
if ( taskInstance . getName ( ) . equals ( taskName ) ) {
return taskInstance ;
return taskInstance ;
@ -706,7 +706,7 @@ public class MasterExecThread implements Runnable {
* @return process instance execution status
* @return process instance execution status
* /
* /
private ExecutionStatus getProcessInstanceState ( ) {
private ExecutionStatus getProcessInstanceState ( ) {
ProcessInstance instance = processDao . findProcessInstanceById ( processInstance . getId ( ) ) ;
ProcessInstance instance = processService . findProcessInstanceById ( processInstance . getId ( ) ) ;
ExecutionStatus state = instance . getState ( ) ;
ExecutionStatus state = instance . getState ( ) ;
if ( activeTaskNode . size ( ) > 0 ) {
if ( activeTaskNode . size ( ) > 0 ) {
@ -784,10 +784,10 @@ public class MasterExecThread implements Runnable {
processInstance . getState ( ) . toString ( ) , state . toString ( ) ,
processInstance . getState ( ) . toString ( ) , state . toString ( ) ,
processInstance . getCommandType ( ) . toString ( ) ) ;
processInstance . getCommandType ( ) . toString ( ) ) ;
processInstance . setState ( state ) ;
processInstance . setState ( state ) ;
ProcessInstance instance = processDao . findProcessInstanceById ( processInstance . getId ( ) ) ;
ProcessInstance instance = processService . findProcessInstanceById ( processInstance . getId ( ) ) ;
instance . setState ( state ) ;
instance . setState ( state ) ;
instance . setProcessDefinition ( processInstance . getProcessDefinition ( ) ) ;
instance . setProcessDefinition ( processInstance . getProcessDefinition ( ) ) ;
processDao . updateProcessInstance ( instance ) ;
processService . updateProcessInstance ( instance ) ;
processInstance = instance ;
processInstance = instance ;
}
}
}
}
@ -845,7 +845,7 @@ public class MasterExecThread implements Runnable {
// send warning email if process time out.
// send warning email if process time out.
if ( ! sendTimeWarning & & checkProcessTimeOut ( processInstance ) ) {
if ( ! sendTimeWarning & & checkProcessTimeOut ( processInstance ) ) {
alertManager . sendProcessTimeoutAlert ( processInstance ,
alertManager . sendProcessTimeoutAlert ( processInstance ,
processDao . findProcessDefineById ( processInstance . getProcessDefinitionId ( ) ) ) ;
processService . findProcessDefineById ( processInstance . getProcessDefinitionId ( ) ) ) ;
sendTimeWarning = true ;
sendTimeWarning = true ;
}
}
for ( Map . Entry < MasterBaseTaskExecThread , Future < Boolean > > entry : activeTaskNode . entrySet ( ) ) {
for ( Map . Entry < MasterBaseTaskExecThread , Future < Boolean > > entry : activeTaskNode . entrySet ( ) ) {
@ -903,7 +903,7 @@ public class MasterExecThread implements Runnable {
if ( completeTask . getState ( ) = = ExecutionStatus . PAUSE ) {
if ( completeTask . getState ( ) = = ExecutionStatus . PAUSE ) {
completeTask . setState ( ExecutionStatus . KILL ) ;
completeTask . setState ( ExecutionStatus . KILL ) ;
completeTaskList . put ( entry . getKey ( ) , completeTask ) ;
completeTaskList . put ( entry . getKey ( ) , completeTask ) ;
processDao . updateTaskInstance ( completeTask ) ;
processService . updateTaskInstance ( completeTask ) ;
}
}
}
}
}
}
@ -961,7 +961,7 @@ public class MasterExecThread implements Runnable {
Future < Boolean > future = entry . getValue ( ) ;
Future < Boolean > future = entry . getValue ( ) ;
TaskInstance taskInstance = taskExecThread . getTaskInstance ( ) ;
TaskInstance taskInstance = taskExecThread . getTaskInstance ( ) ;
taskInstance = processDao . findTaskInstanceById ( taskInstance . getId ( ) ) ;
taskInstance = processService . findTaskInstanceById ( taskInstance . getId ( ) ) ;
if ( taskInstance . getState ( ) . typeIsFinished ( ) ) {
if ( taskInstance . getState ( ) . typeIsFinished ( ) ) {
continue ;
continue ;
}
}
@ -1031,7 +1031,7 @@ public class MasterExecThread implements Runnable {
}
}
try {
try {
Integer intId = Integer . valueOf ( taskId ) ;
Integer intId = Integer . valueOf ( taskId ) ;
TaskInstance task = processDao . findTaskInstanceById ( intId ) ;
TaskInstance task = processService . findTaskInstanceById ( intId ) ;
if ( task = = null ) {
if ( task = = null ) {
logger . error ( "start node id cannot be found: {}" , taskId ) ;
logger . error ( "start node id cannot be found: {}" , taskId ) ;
} else {
} else {