@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType ;
import org.apache.dolphinscheduler.common.enums.AuthorizationType ;
import org.apache.dolphinscheduler.common.enums.CommandType ;
import org.apache.dolphinscheduler.common.enums.CommandType ;
import org.apache.dolphinscheduler.common.enums.ConditionType ;
import org.apache.dolphinscheduler.common.enums.ConditionType ;
import org.apache.dolphinscheduler.common.enums.CycleEnum ;
import org.apache.dolphinscheduler.common.enums.Direct ;
import org.apache.dolphinscheduler.common.enums.Direct ;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus ;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus ;
import org.apache.dolphinscheduler.common.enums.FailureStrategy ;
import org.apache.dolphinscheduler.common.enums.FailureStrategy ;
@ -64,7 +63,6 @@ import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeExceptio
import org.apache.dolphinscheduler.common.utils.StringUtils ;
import org.apache.dolphinscheduler.common.utils.StringUtils ;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils ;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils ;
import org.apache.dolphinscheduler.dao.entity.Command ;
import org.apache.dolphinscheduler.dao.entity.Command ;
import org.apache.dolphinscheduler.dao.entity.CycleDependency ;
import org.apache.dolphinscheduler.dao.entity.DagData ;
import org.apache.dolphinscheduler.dao.entity.DagData ;
import org.apache.dolphinscheduler.dao.entity.DataSource ;
import org.apache.dolphinscheduler.dao.entity.DataSource ;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand ;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand ;
@ -108,11 +106,9 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.utils.Host ;
import org.apache.dolphinscheduler.remote.utils.Host ;
import org.apache.dolphinscheduler.service.exceptions.ServiceException ;
import org.apache.dolphinscheduler.service.exceptions.ServiceException ;
import org.apache.dolphinscheduler.service.log.LogClientService ;
import org.apache.dolphinscheduler.service.log.LogClientService ;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils ;
import java.util.ArrayList ;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.Arrays ;
import java.util.Calendar ;
import java.util.Date ;
import java.util.Date ;
import java.util.EnumMap ;
import java.util.EnumMap ;
import java.util.HashMap ;
import java.util.HashMap ;
@ -125,14 +121,12 @@ import java.util.Objects;
import java.util.Set ;
import java.util.Set ;
import java.util.stream.Collectors ;
import java.util.stream.Collectors ;
import org.quartz.CronExpression ;
import org.slf4j.Logger ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Component ;
import org.springframework.stereotype.Component ;
import org.springframework.transaction.annotation.Transactional ;
import org.springframework.transaction.annotation.Transactional ;
import com.cronutils.model.Cron ;
import com.fasterxml.jackson.databind.JsonNode ;
import com.fasterxml.jackson.databind.JsonNode ;
import com.fasterxml.jackson.databind.node.ObjectNode ;
import com.fasterxml.jackson.databind.node.ObjectNode ;
@ -403,6 +397,16 @@ public class ProcessService {
return processDefinition ;
return processDefinition ;
}
}
/ * *
* find process define by code .
*
* @param processDefinitionCode processDefinitionCode
* @return process definition
* /
public ProcessDefinition findProcessDefinitionByCode ( Long processDefinitionCode ) {
return processDefineMapper . queryByCode ( processDefinitionCode ) ;
}
/ * *
/ * *
* delete work process instance by id
* delete work process instance by id
*
*
@ -483,7 +487,6 @@ public class ProcessService {
public void recurseFindSubProcessId ( int parentId , List < Integer > ids ) {
public void recurseFindSubProcessId ( int parentId , List < Integer > ids ) {
List < TaskDefinition > taskNodeList = this . getTaskNodeListByDefinitionId ( parentId ) ;
List < TaskDefinition > taskNodeList = this . getTaskNodeListByDefinitionId ( parentId ) ;
if ( taskNodeList ! = null & & ! taskNodeList . isEmpty ( ) ) {
if ( taskNodeList ! = null & & ! taskNodeList . isEmpty ( ) ) {
for ( TaskDefinition taskNode : taskNodeList ) {
for ( TaskDefinition taskNode : taskNodeList ) {
@ -1628,7 +1631,6 @@ public class ProcessService {
taskInstance . setTaskParams ( JSONUtils . toJsonString ( taskParams ) ) ;
taskInstance . setTaskParams ( JSONUtils . toJsonString ( taskParams ) ) ;
}
}
/ * *
/ * *
* convert integer list to string list
* convert integer list to string list
*
*
@ -1657,13 +1659,13 @@ public class ProcessService {
}
}
/ * *
/ * *
* query Schedule by processDefinitionId
* query Schedule by processDefinitionCode
*
*
* @param processDefinitionId processDefinitionId
* @param processDefinitionCode processDefinitionCode
* @see Schedule
* @see Schedule
* /
* /
public List < Schedule > queryReleaseSchedulerListByProcessDefinitionId ( int processDefinitionId ) {
public List < Schedule > queryReleaseSchedulerListByProcessDefinitionCode ( long processDefinitionCode ) {
return scheduleMapper . queryReleaseSchedulerListByProcessDefinitionId ( processDefinitionId ) ;
return scheduleMapper . queryReleaseSchedulerListByProcessDefinitionCode ( processDefinitionCode ) ;
}
}
/ * *
/ * *
@ -1730,7 +1732,6 @@ public class ProcessService {
ProcessInstance instance = processInstanceMapper . selectById ( processInstanceId ) ;
ProcessInstance instance = processInstanceMapper . selectById ( processInstanceId ) ;
instance . setState ( executionStatus ) ;
instance . setState ( executionStatus ) ;
return processInstanceMapper . updateById ( instance ) ;
return processInstanceMapper . updateById ( instance ) ;
}
}
/ * *
/ * *
@ -1785,98 +1786,13 @@ public class ProcessService {
}
}
/ * *
/ * *
* find schedule list by process define id .
* find schedule list by process define codes .
*
*
* @param ids id s
* @param codes code s
* @return schedule list
* @return schedule list
* /
* /
public List < Schedule > selectAllByProcessDefineId ( int [ ] ids ) {
public List < Schedule > selectAllByProcessDefineCode ( long [ ] codes ) {
return scheduleMapper . selectAllByProcessDefineArray (
return scheduleMapper . selectAllByProcessDefineArray ( codes ) ;
ids ) ;
}
/ * *
* get dependency cycle by work process define id and scheduler fire time
*
* @param masterId masterId
* @param processDefinitionId processDefinitionId
* @param scheduledFireTime the time the task schedule is expected to trigger
* @return CycleDependency
* @throws Exception if error throws Exception
* /
public CycleDependency getCycleDependency ( int masterId , int processDefinitionId , Date scheduledFireTime ) throws Exception {
List < CycleDependency > list = getCycleDependencies ( masterId , new int [ ] { processDefinitionId } , scheduledFireTime ) ;
return ! list . isEmpty ( ) ? list . get ( 0 ) : null ;
}
/ * *
* get dependency cycle list by work process define id list and scheduler fire time
*
* @param masterId masterId
* @param ids ids
* @param scheduledFireTime the time the task schedule is expected to trigger
* @return CycleDependency list
* @throws Exception if error throws Exception
* /
public List < CycleDependency > getCycleDependencies ( int masterId , int [ ] ids , Date scheduledFireTime ) throws Exception {
List < CycleDependency > cycleDependencyList = new ArrayList < > ( ) ;
if ( null = = ids | | ids . length = = 0 ) {
logger . warn ( "ids[] is empty!is invalid!" ) ;
return cycleDependencyList ;
}
if ( scheduledFireTime = = null ) {
logger . warn ( "scheduledFireTime is null!is invalid!" ) ;
return cycleDependencyList ;
}
String strCrontab = "" ;
CronExpression depCronExpression ;
Cron depCron ;
List < Date > list ;
List < Schedule > schedules = this . selectAllByProcessDefineId ( ids ) ;
// for all scheduling information
for ( Schedule depSchedule : schedules ) {
strCrontab = depSchedule . getCrontab ( ) ;
depCronExpression = CronUtils . parse2CronExpression ( strCrontab ) ;
depCron = CronUtils . parse2Cron ( strCrontab ) ;
CycleEnum cycleEnum = CronUtils . getMiniCycle ( depCron ) ;
if ( cycleEnum = = null ) {
logger . error ( "{} is not valid" , strCrontab ) ;
continue ;
}
Calendar calendar = Calendar . getInstance ( ) ;
switch ( cycleEnum ) {
case HOUR :
calendar . add ( Calendar . HOUR , - 25 ) ;
break ;
case DAY :
case WEEK :
calendar . add ( Calendar . DATE , - 32 ) ;
break ;
case MONTH :
calendar . add ( Calendar . MONTH , - 13 ) ;
break ;
default :
String cycleName = cycleEnum . name ( ) ;
logger . warn ( "Dependent process definition's cycleEnum is {},not support!!" , cycleName ) ;
continue ;
}
Date start = calendar . getTime ( ) ;
if ( depSchedule . getProcessDefinitionId ( ) = = masterId ) {
list = CronUtils . getSelfFireDateList ( start , scheduledFireTime , depCronExpression ) ;
} else {
list = CronUtils . getFireDateList ( start , scheduledFireTime , depCronExpression ) ;
}
if ( ! list . isEmpty ( ) ) {
start = list . get ( list . size ( ) - 1 ) ;
CycleDependency dependency = new CycleDependency ( depSchedule . getProcessDefinitionId ( ) , start , CronUtils . getExpirationTime ( start , cycleEnum ) , cycleEnum ) ;
cycleDependencyList . add ( dependency ) ;
}
}
return cycleDependencyList ;
}
}
/ * *
/ * *