@ -14,39 +14,62 @@
* See the License for the specific language governing permissions and
* limitations under the License .
* /
package org.apache.dolphinscheduler.api.service ;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE ;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES ;
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT ;
import org.apache.dolphinscheduler.api.enums.ExecuteType ;
import org.apache.dolphinscheduler.api.enums.Status ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.* ;
import org.apache.dolphinscheduler.common.enums.CommandType ;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus ;
import org.apache.dolphinscheduler.common.enums.FailureStrategy ;
import org.apache.dolphinscheduler.common.enums.Priority ;
import org.apache.dolphinscheduler.common.enums.ReleaseState ;
import org.apache.dolphinscheduler.common.enums.RunMode ;
import org.apache.dolphinscheduler.common.enums.TaskDependType ;
import org.apache.dolphinscheduler.common.enums.WarningType ;
import org.apache.dolphinscheduler.common.model.Server ;
import org.apache.dolphinscheduler.common.utils.CollectionUtils ;
import org.apache.dolphinscheduler.common.utils.DateUtils ;
import org.apache.dolphinscheduler.common.utils.* ;
import org.apache.dolphinscheduler.common.utils.JSONUtils ;
import org.apache.dolphinscheduler.common.utils.StringUtils ;
import org.apache.dolphinscheduler.dao.entity.* ;
import org.apache.dolphinscheduler.dao.entity.Command ;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.dao.entity.Project ;
import org.apache.dolphinscheduler.dao.entity.Schedule ;
import org.apache.dolphinscheduler.dao.entity.Tenant ;
import org.apache.dolphinscheduler.dao.entity.User ;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper ;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper ;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper ;
import org.apache.dolphinscheduler.service.process.ProcessService ;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils ;
import java.text.ParseException ;
import java.util.ArrayList ;
import java.util.Date ;
import java.util.HashMap ;
import java.util.LinkedList ;
import java.util.List ;
import java.util.Map ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Service ;
import java.text.ParseException ;
import java.util.* ;
import static org.apache.dolphinscheduler.common.Constants.* ;
/ * *
* executor service
* /
@Service
public class ExecutorService extends BaseService {
public class ExecutorService extends BaseService {
private static final Logger logger = LoggerFactory . getLogger ( ExecutorService . class ) ;
@ -73,22 +96,22 @@ public class ExecutorService extends BaseService{
/ * *
* execute process instance
*
* @param loginUser login user
* @param projectName project name
* @param processDefinitionId process Definition Id
* @param cronTime cron time
* @param commandType command type
* @param failureStrategy failuer strategy
* @param startNodeList start nodelist
* @param taskDependType node dependency type
* @param warningType warning type
* @param warningGroupId notify group id
* @param receivers receivers
* @param receiversCc receivers cc
* @param loginUser login user
* @param projectName project name
* @param processDefinitionId process Definition Id
* @param cronTime cron time
* @param commandType command type
* @param failureStrategy failuer strategy
* @param startNodeList start nodelist
* @param taskDependType node dependency type
* @param warningType warning type
* @param warningGroupId notify group id
* @param receivers receivers
* @param receiversCc receivers cc
* @param processInstancePriority process instance priority
* @param workerGroup worker group name
* @param runMode run mode
* @param timeout timeout
* @param timeout timeout
* @return execute process instance code
* @throws ParseException Parse Exception
* /
@ -101,23 +124,23 @@ public class ExecutorService extends BaseService{
Map < String , Object > result = new HashMap < > ( ) ;
// timeout is invalid
if ( timeout < = 0 | | timeout > MAX_TASK_TIMEOUT ) {
putMsg ( result , Status . TASK_TIMEOUT_PARAMS_ERROR ) ;
putMsg ( result , Status . TASK_TIMEOUT_PARAMS_ERROR ) ;
return result ;
}
Project project = projectMapper . queryByName ( projectName ) ;
Map < String , Object > checkResultAndAuth = checkResultAndAuth ( loginUser , projectName , project ) ;
if ( checkResultAndAuth ! = null ) {
if ( checkResultAndAuth ! = null ) {
return checkResultAndAuth ;
}
// check process define release state
ProcessDefinition processDefinition = processDefinitionMapper . selectById ( processDefinitionId ) ;
result = checkProcessDefinitionValid ( processDefinition , processDefinitionId ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
}
if ( ! checkTenantSuitable ( processDefinition ) ) {
if ( ! checkTenantSuitable ( processDefinition ) ) {
logger . error ( "there is not any valid tenant for the process definition: id:{},name:{}, " ,
processDefinition . getId ( ) , processDefinition . getName ( ) ) ;
putMsg ( result , Status . TENANT_NOT_SUITABLE ) ;
@ -129,14 +152,13 @@ public class ExecutorService extends BaseService{
return result ;
}
/ * *
* create command
* /
int create = this . createCommand ( commandType , processDefinitionId ,
taskDependType , failureStrategy , startNodeList , cronTime , warningType , loginUser . getId ( ) ,
warningGroupId , runMode , processInstancePriority , workerGroup ) ;
if ( create > 0 ) {
warningGroupId , runMode , processInstancePriority , workerGroup ) ;
if ( create > 0 ) {
/ * *
* according to the process definition ID updateProcessInstance and CC recipient
* /
@ -152,6 +174,7 @@ public class ExecutorService extends BaseService{
/ * *
* check whether master exists
*
* @param result result
* @return master exists return true , otherwise return false
* /
@ -167,7 +190,6 @@ public class ExecutorService extends BaseService{
return true ;
}
/ * *
* check whether the process definition can be executed
*
@ -175,22 +197,20 @@ public class ExecutorService extends BaseService{
* @param processDefineId process definition id
* @return check result code
* /
public Map < String , Object > checkProcessDefinitionValid ( ProcessDefinition processDefinition , int processDefineId ) {
public Map < String , Object > checkProcessDefinitionValid ( ProcessDefinition processDefinition , int processDefineId ) {
Map < String , Object > result = new HashMap < > ( ) ;
if ( processDefinition = = null ) {
// check process definition exists
putMsg ( result , Status . PROCESS_DEFINE_NOT_EXIST , processDefineId ) ;
putMsg ( result , Status . PROCESS_DEFINE_NOT_EXIST , processDefineId ) ;
} else if ( processDefinition . getReleaseState ( ) ! = ReleaseState . ONLINE ) {
// check process definition online
putMsg ( result , Status . PROCESS_DEFINE_NOT_RELEASE , processDefineId ) ;
} else {
putMsg ( result , Status . PROCESS_DEFINE_NOT_RELEASE , processDefineId ) ;
} else {
result . put ( Constants . STATUS , Status . SUCCESS ) ;
}
return result ;
}
/ * *
* do action to process instance : pause , stop , repeat , recover from pause , recover from stop
*
@ -214,7 +234,6 @@ public class ExecutorService extends BaseService{
return result ;
}
ProcessInstance processInstance = processService . findProcessInstanceDetailById ( processInstanceId ) ;
if ( processInstance = = null ) {
putMsg ( result , Status . PROCESS_INSTANCE_NOT_EXIST , processInstanceId ) ;
@ -222,7 +241,7 @@ public class ExecutorService extends BaseService{
}
ProcessDefinition processDefinition = processService . findProcessDefineById ( processInstance . getProcessDefinitionId ( ) ) ;
if ( executeType ! = ExecuteType . STOP & & executeType ! = ExecuteType . PAUSE ) {
if ( executeType ! = ExecuteType . STOP & & executeType ! = ExecuteType . PAUSE ) {
result = checkProcessDefinitionValid ( processDefinition , processInstance . getProcessDefinitionId ( ) ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
@ -234,7 +253,7 @@ public class ExecutorService extends BaseService{
if ( status ! = Status . SUCCESS ) {
return checkResult ;
}
if ( ! checkTenantSuitable ( processDefinition ) ) {
if ( ! checkTenantSuitable ( processDefinition ) ) {
logger . error ( "there is not any valid tenant for the process definition: id:{},name:{}, " ,
processDefinition . getId ( ) , processDefinition . getName ( ) ) ;
putMsg ( result , Status . TENANT_NOT_SUITABLE ) ;
@ -275,6 +294,7 @@ public class ExecutorService extends BaseService{
/ * *
* check tenant suitable
*
* @param processDefinition process definition
* @return true if tenant suitable , otherwise return false
* /
@ -315,7 +335,7 @@ public class ExecutorService extends BaseService{
}
break ;
case RECOVER_SUSPENDED_PROCESS :
if ( executionStatus . typeIsPause ( ) | | executionStatus . typeIsCancel ( ) ) {
if ( executionStatus . typeIsPause ( ) | | executionStatus . typeIsCancel ( ) ) {
checkResult = true ;
}
break ;
@ -323,7 +343,7 @@ public class ExecutorService extends BaseService{
break ;
}
if ( ! checkResult ) {
putMsg ( result , Status . PROCESS_INSTANCE_STATE_OPERATION_ERROR , processInstance . getName ( ) , executionStatus . toString ( ) , executeType . toString ( ) ) ;
putMsg ( result , Status . PROCESS_INSTANCE_STATE_OPERATION_ERROR , processInstance . getName ( ) , executionStatus . toString ( ) , executeType . toString ( ) ) ;
} else {
putMsg ( result , Status . SUCCESS ) ;
}
@ -331,7 +351,7 @@ public class ExecutorService extends BaseService{
}
/ * *
* prepare to update process instance command type and status
* prepare to update process instance command type and status
*
* @param processInstance process instance
* @param commandType command type
@ -370,11 +390,11 @@ public class ExecutorService extends BaseService{
command . setCommandType ( commandType ) ;
command . setProcessDefinitionId ( processDefinitionId ) ;
command . setCommandParam ( String . format ( "{\"%s\":%d}" ,
CMDPARAM_RECOVER_PROCESS_ID_STRING , instanceId ) ) ;
CMD_ PARAM_RECOVER_PROCESS_ID_STRING , instanceId ) ) ;
command . setExecutorId ( loginUser . getId ( ) ) ;
if ( ! processService . verifyIsNeedCreateCommand ( command ) ) {
putMsg ( result , Status . PROCESS_INSTANCE_EXECUTING_COMMAND , processDefinitionId ) ;
if ( ! processService . verifyIsNeedCreateCommand ( command ) ) {
putMsg ( result , Status . PROCESS_INSTANCE_EXECUTING_COMMAND , processDefinitionId ) ;
return result ;
}
@ -391,28 +411,29 @@ public class ExecutorService extends BaseService{
/ * *
* check if sub processes are offline before starting process definition
*
* @param processDefineId process definition id
* @return check result code
* /
public Map < String , Object > startCheckByProcessDefinedId ( int processDefineId ) {
Map < String , Object > result = new HashMap < > ( ) ;
if ( processDefineId = = 0 ) {
if ( processDefineId = = 0 ) {
logger . error ( "process definition id is null" ) ;
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , "process definition id" ) ;
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , "process definition id" ) ;
}
List < Integer > ids = new ArrayList < > ( ) ;
processService . recurseFindSubProcessId ( processDefineId , ids ) ;
Integer [ ] idArray = ids . toArray ( new Integer [ ids . size ( ) ] ) ;
if ( ! ids . isEmpty ( ) ) {
if ( ! ids . isEmpty ( ) ) {
List < ProcessDefinition > processDefinitionList = processDefinitionMapper . queryDefinitionListByIdList ( idArray ) ;
if ( processDefinitionList ! = null ) {
for ( ProcessDefinition processDefinition : processDefinitionList ) {
if ( processDefinitionList ! = null ) {
for ( ProcessDefinition processDefinition : processDefinitionList ) {
/ * *
* if there is no online process , exit directly
* /
if ( processDefinition . getReleaseState ( ) ! = ReleaseState . ONLINE ) {
putMsg ( result , Status . PROCESS_DEFINE_NOT_RELEASE , processDefinition . getName ( ) ) ;
if ( processDefinition . getReleaseState ( ) ! = ReleaseState . ONLINE ) {
putMsg ( result , Status . PROCESS_DEFINE_NOT_RELEASE , processDefinition . getName ( ) ) ;
logger . info ( "not release process definition id: {} , name : {}" ,
processDefinition . getId ( ) , processDefinition . getName ( ) ) ;
return result ;
@ -431,13 +452,13 @@ public class ExecutorService extends BaseService{
* @param processInstanceId process instance id
* @return receivers cc list
* /
public Map < String , Object > getReceiverCc ( Integer processDefineId , Integer processInstanceId ) {
public Map < String , Object > getReceiverCc ( Integer processDefineId , Integer processInstanceId ) {
Map < String , Object > result = new HashMap < > ( ) ;
logger . info ( "processInstanceId {}" , processInstanceId ) ;
if ( processDefineId = = null & & processInstanceId = = null ) {
logger . info ( "processInstanceId {}" , processInstanceId ) ;
if ( processDefineId = = null & & processInstanceId = = null ) {
throw new RuntimeException ( "You must set values for parameters processDefineId or processInstanceId" ) ;
}
if ( processDefineId = = null & & processInstanceId ! = null ) {
if ( processDefineId = = null & & processInstanceId ! = null ) {
ProcessInstance processInstance = processInstanceMapper . selectById ( processInstanceId ) ;
if ( processInstance = = null ) {
throw new RuntimeException ( "processInstanceId is not exists" ) ;
@ -445,24 +466,24 @@ public class ExecutorService extends BaseService{
processDefineId = processInstance . getProcessDefinitionId ( ) ;
}
ProcessDefinition processDefinition = processDefinitionMapper . selectById ( processDefineId ) ;
if ( processDefinition = = null ) {
throw new RuntimeException ( String . format ( "processDefineId %d is not exists" , processDefineId ) ) ;
if ( processDefinition = = null ) {
throw new RuntimeException ( String . format ( "processDefineId %d is not exists" , processDefineId ) ) ;
}
String receivers = processDefinition . getReceivers ( ) ;
String receiversCc = processDefinition . getReceiversCc ( ) ;
Map < String , String > dataMap = new HashMap < > ( ) ;
dataMap . put ( Constants . RECEIVERS , receivers ) ;
dataMap . put ( Constants . RECEIVERS_CC , receiversCc ) ;
Map < String , String > dataMap = new HashMap < > ( ) ;
dataMap . put ( Constants . RECEIVERS , receivers ) ;
dataMap . put ( Constants . RECEIVERS_CC , receiversCc ) ;
result . put ( Constants . DATA_LIST , dataMap ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
}
/ * *
* create command
*
* @param commandType commandType
* @param processDefineId processDefineId
* @param nodeDep nodeDep
@ -476,37 +497,36 @@ public class ExecutorService extends BaseService{
* @param processInstancePriority processInstancePriority
* @param workerGroup workerGroup
* @return command id
* @throws ParseException
* /
private int createCommand ( CommandType commandType , int processDefineId ,
TaskDependType nodeDep , FailureStrategy failureStrategy ,
String startNodeList , String schedule , WarningType warningType ,
int executorId , int warningGroupId ,
RunMode runMode , Priority processInstancePriority , String workerGroup ) throws ParseException {
RunMode runMode , Priority processInstancePriority , String workerGroup ) throws ParseException {
/ * *
* instantiate command schedule instance
* /
Command command = new Command ( ) ;
Map < String , String > cmdParam = new HashMap < > ( ) ;
if ( commandType = = null ) {
Map < String , String > cmdParam = new HashMap < > ( ) ;
if ( commandType = = null ) {
command . setCommandType ( CommandType . START_PROCESS ) ;
} else {
} else {
command . setCommandType ( commandType ) ;
}
command . setProcessDefinitionId ( processDefineId ) ;
if ( nodeDep ! = null ) {
if ( nodeDep ! = null ) {
command . setTaskDependType ( nodeDep ) ;
}
if ( failureStrategy ! = null ) {
if ( failureStrategy ! = null ) {
command . setFailureStrategy ( failureStrategy ) ;
}
if ( StringUtils . isNotEmpty ( startNodeList ) ) {
cmdParam . put ( CMDPARAM_START_NODE_NAMES , startNodeList ) ;
if ( StringUtils . isNotEmpty ( startNodeList ) ) {
cmdParam . put ( CMD_ PARAM_START_NODE_NAMES , startNodeList ) ;
}
if ( warningType ! = null ) {
if ( warningType ! = null ) {
command . setWarningType ( warningType ) ;
}
command . setCommandParam ( JSONUtils . toJsonString ( cmdParam ) ) ;
@ -517,32 +537,32 @@ public class ExecutorService extends BaseService{
Date start = null ;
Date end = null ;
if ( StringUtils . isNotEmpty ( schedule ) ) {
if ( StringUtils . isNotEmpty ( schedule ) ) {
String [ ] interval = schedule . split ( "," ) ;
if ( interval . length = = 2 ) {
if ( interval . length = = 2 ) {
start = DateUtils . getScheduleDate ( interval [ 0 ] ) ;
end = DateUtils . getScheduleDate ( interval [ 1 ] ) ;
}
}
// determine whether to complement
if ( commandType = = CommandType . COMPLEMENT_DATA ) {
if ( commandType = = CommandType . COMPLEMENT_DATA ) {
runMode = ( runMode = = null ) ? RunMode . RUN_MODE_SERIAL : runMode ;
if ( null ! = start & & null ! = end & & ! start . after ( end ) ) {
if ( runMode = = RunMode . RUN_MODE_SERIAL ) {
if ( null ! = start & & null ! = end & & ! start . after ( end ) ) {
if ( runMode = = RunMode . RUN_MODE_SERIAL ) {
cmdParam . put ( CMDPARAM_COMPLEMENT_DATA_START_DATE , DateUtils . dateToString ( start ) ) ;
cmdParam . put ( CMDPARAM_COMPLEMENT_DATA_END_DATE , DateUtils . dateToString ( end ) ) ;
command . setCommandParam ( JSONUtils . toJsonString ( cmdParam ) ) ;
return processService . createCommand ( command ) ;
} else if ( runMode = = RunMode . RUN_MODE_PARALLEL ) {
} else if ( runMode = = RunMode . RUN_MODE_PARALLEL ) {
List < Schedule > schedules = processService . queryReleaseSchedulerListByProcessDefinitionId ( processDefineId ) ;
List < Date > listDate = new LinkedList < > ( ) ;
if ( ! CollectionUtils . isEmpty ( schedules ) ) {
if ( ! CollectionUtils . isEmpty ( schedules ) ) {
for ( Schedule item : schedules ) {
listDate . addAll ( CronUtils . getSelfFireDateList ( start , end , item . getCrontab ( ) ) ) ;
}
}
if ( ! CollectionUtils . isEmpty ( listDate ) ) {
if ( ! CollectionUtils . isEmpty ( listDate ) ) {
// loop by schedule date
for ( Date date : listDate ) {
cmdParam . put ( CMDPARAM_COMPLEMENT_DATA_START_DATE , DateUtils . dateToString ( date ) ) ;
@ -551,10 +571,10 @@ public class ExecutorService extends BaseService{
processService . createCommand ( command ) ;
}
return listDate . size ( ) ;
} else {
} else {
// loop by day
int runCunt = 0 ;
while ( ! start . after ( end ) ) {
while ( ! start . after ( end ) ) {
runCunt + = 1 ;
cmdParam . put ( CMDPARAM_COMPLEMENT_DATA_START_DATE , DateUtils . dateToString ( start ) ) ;
cmdParam . put ( CMDPARAM_COMPLEMENT_DATA_END_DATE , DateUtils . dateToString ( start ) ) ;
@ -565,11 +585,11 @@ public class ExecutorService extends BaseService{
return runCunt ;
}
}
} else {
} else {
logger . error ( "there is not valid schedule date for the process definition: id:{},date:{}" ,
processDefineId , schedule ) ;
}
} else {
} else {
command . setCommandParam ( JSONUtils . toJsonString ( cmdParam ) ) ;
return processService . createCommand ( command ) ;
}
@ -579,11 +599,6 @@ public class ExecutorService extends BaseService{
/ * *
* check result and auth
*
* @param loginUser
* @param projectName
* @param project
* @return
* /
private Map < String , Object > checkResultAndAuth ( User loginUser , String projectName , Project project ) {
// check project auth