@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License .
* /
package org.apache.dolphinscheduler.service.process ;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE ;
@ -145,7 +146,6 @@ public class ProcessService {
private ResourceMapper resourceMapper ;
@Autowired
private ErrorCommandMapper errorCommandMapper ;
@ -157,6 +157,7 @@ public class ProcessService {
/ * *
* handle Command ( construct ProcessInstance from Command ) , wrapped in transaction
*
* @param logger logger
* @param host host
* @param validThreadNum validThreadNum
@ -186,6 +187,7 @@ public class ProcessService {
/ * *
* save error command , and delete original command
*
* @param command command
* @param message message
* /
@ -198,6 +200,7 @@ public class ProcessService {
/ * *
* set process waiting thread
*
* @param command command
* @param processInstance processInstance
* @return process instance
@ -215,6 +218,7 @@ public class ProcessService {
/ * *
* check thread num
*
* @param command command
* @param validThreadNum validThreadNum
* @return if thread is enough
@ -226,6 +230,7 @@ public class ProcessService {
/ * *
* insert one command
*
* @param command command
* @return create result
* /
@ -239,6 +244,7 @@ public class ProcessService {
/ * *
* find one command from queue list
*
* @return command
* /
public Command findOneCommand ( ) {
@ -247,6 +253,7 @@ public class ProcessService {
/ * *
* check the input command exists in queue list
*
* @param command command
* @return create command result
* /
@ -279,6 +286,7 @@ public class ProcessService {
/ * *
* find process instance detail by id
*
* @param processId processId
* @return process instance
* /
@ -288,6 +296,7 @@ public class ProcessService {
/ * *
* get task node list by definitionId
*
* @param defineId
* @return
* /
@ -312,6 +321,7 @@ public class ProcessService {
/ * *
* find process instance by id
*
* @param processId processId
* @return process instance
* /
@ -321,6 +331,7 @@ public class ProcessService {
/ * *
* find process define by id .
*
* @param processDefinitionId processDefinitionId
* @return process definition
* /
@ -330,6 +341,7 @@ public class ProcessService {
/ * *
* delete work process instance by id
*
* @param processInstanceId processInstanceId
* @return delete process instance result
* /
@ -339,6 +351,7 @@ public class ProcessService {
/ * *
* delete all sub process by parent instance id
*
* @param processInstanceId processInstanceId
* @return delete all sub process instance result
* /
@ -355,9 +368,9 @@ public class ProcessService {
return 1 ;
}
/ * *
* remove task log file
*
* @param processInstanceId processInstanceId
* /
public void removeTaskLogFile ( Integer processInstanceId ) {
@ -383,16 +396,14 @@ public class ProcessService {
// compatible old version
ip = taskInstance . getHost ( ) ;
}
// remove task log from loggerserver
logClient . removeTaskLog ( ip , port , taskLogPath ) ;
}
}
/ * *
* calculate sub process number in the process define .
*
* @param processDefinitionId processDefinitionId
* @return process thread num count
* /
@ -404,6 +415,7 @@ public class ProcessService {
/ * *
* recursive query sub process definition id by parent id .
*
* @param parentId parentId
* @param ids ids
* /
@ -435,6 +447,7 @@ public class ProcessService {
* sub work process instance need not to create recovery command .
* create recovery waiting thread command and delete origin command at the same time .
* if the recovery command is exists , only update the field update_time
*
* @param originCommand originCommand
* @param processInstance processInstance
* /
@ -485,6 +498,7 @@ public class ProcessService {
/ * *
* get schedule time from command
*
* @param command command
* @param cmdParam cmdParam map
* @return date
@ -501,6 +515,7 @@ public class ProcessService {
/ * *
* generate a new work process instance from command .
*
* @param processDefinition processDefinition
* @param command command
* @param cmdParam cmdParam map
@ -558,6 +573,7 @@ public class ProcessService {
* there is tenant id in definition , use the tenant of the definition .
* if there is not tenant id in the definiton or the tenant not exist
* use definition creator ' s tenant .
*
* @param tenantId tenantId
* @param userId userId
* @return tenant
@ -581,6 +597,7 @@ public class ProcessService {
/ * *
* check command parameters is valid
*
* @param command command
* @param cmdParam cmdParam map
* @return whether command param is valid
@ -599,6 +616,7 @@ public class ProcessService {
/ * *
* construct process instance according to one command .
*
* @param command command
* @param host host
* @return process instance
@ -755,6 +773,7 @@ public class ProcessService {
/ * *
* return complement data if the process start with complement data
*
* @param processInstance processInstance
* @param command command
* @return command type
@ -769,6 +788,7 @@ public class ProcessService {
/ * *
* initialize complement data parameters
*
* @param processDefinition processDefinition
* @param processInstance processInstance
* @param cmdParam cmdParam
@ -792,11 +812,11 @@ public class ProcessService {
}
/ * *
* set sub work process parameters .
* handle sub work process instance , update relation table and command parameters
* set sub work process flag , extends parent work process command parameters
*
* @param subProcessInstance subProcessInstance
* @return process instance
* /
@ -841,6 +861,7 @@ public class ProcessService {
/ * *
* join parent global params into sub process .
* only the keys doesn ' t in sub process global would be joined .
*
* @param parentGlobalParams parentGlobalParams
* @param subGlobalParams subGlobalParams
* @return global params join
@ -862,6 +883,7 @@ public class ProcessService {
/ * *
* initialize task instance
*
* @param taskInstance taskInstance
* /
private void initTaskInstance ( TaskInstance taskInstance ) {
@ -880,6 +902,7 @@ public class ProcessService {
/ * *
* submit task to db
* submit sub process to command
*
* @param taskInstance taskInstance
* @return task instance
* /
@ -909,6 +932,7 @@ public class ProcessService {
* consider o
* repeat running does not generate new sub process instance
* set map { parent instance id , task instance id , 0 ( child instance id ) }
*
* @param parentInstance parentInstance
* @param parentTask parentTask
* @return process instance map
@ -937,6 +961,7 @@ public class ProcessService {
/ * *
* find previous task work process map .
*
* @param parentProcessInstance parentProcessInstance
* @param parentTask parentTask
* @return process instance map
@ -990,6 +1015,7 @@ public class ProcessService {
/ * *
* complement data needs transform parent parameter to child .
*
* @param instanceMap
* @param parentProcessInstance
* @return
@ -1011,6 +1037,7 @@ public class ProcessService {
/ * *
* create sub work process command
*
* @param parentProcessInstance
* @param childInstance
* @param instanceMap
@ -1043,6 +1070,7 @@ public class ProcessService {
/ * *
* initialize sub work flow state
* child instance state would be initialized when ' recovery from pause / stop / failure '
*
* @param childInstance
* /
private void initSubInstanceState ( ProcessInstance childInstance ) {
@ -1070,7 +1098,8 @@ public class ProcessService {
}
/ * *
* update sub process definition
* update sub process definition todo
*
* @param parentProcessInstance parentProcessInstance
* @param childDefinitionId childDefinitionId
* /
@ -1078,14 +1107,14 @@ public class ProcessService {
ProcessDefinition fatherDefinition = this . findProcessDefineById ( parentProcessInstance . getProcessDefinitionId ( ) ) ;
ProcessDefinition childDefinition = this . findProcessDefineById ( childDefinitionId ) ;
if ( childDefinition ! = null & & fatherDefinition ! = null ) {
childDefinition . setReceivers ( fatherDefinition . getReceivers ( ) ) ;
childDefinition . setReceiversCc ( fatherDefinition . getReceiversCc ( ) ) ;
childDefinition . setWarningGroupId ( fatherDefinition . getWarningGroupId ( ) ) ;
processDefineMapper . updateById ( childDefinition ) ;
}
}
/ * *
* submit task to mysql
*
* @param taskInstance taskInstance
* @param processInstance processInstance
* @return task instance
@ -1132,10 +1161,10 @@ public class ProcessService {
return taskInstance ;
}
/ * *
* $ { processInstancePriority } _$ { processInstanceId } _$ { taskInstancePriority } _$ { taskInstanceId } _$ { task executed by ip1 } , $ { ip2 } . . .
* The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low .
*
* @param taskInstance taskInstance
* @return task zk queue str
* /
@ -1199,6 +1228,7 @@ public class ProcessService {
/ * *
* check process instance strategy
*
* @param taskInstance taskInstance
* @return check strategy result
* /
@ -1220,6 +1250,7 @@ public class ProcessService {
/ * *
* check the task instance existing in queue
*
* @param taskInstance taskInstance
* @return whether taskinstance exists queue
* /
@ -1235,6 +1266,7 @@ public class ProcessService {
/ * *
* create a new process instance
*
* @param processInstance processInstance
* /
public void createProcessInstance ( ProcessInstance processInstance ) {
@ -1246,6 +1278,7 @@ public class ProcessService {
/ * *
* insert or update work process instance to data base
*
* @param processInstance processInstance
* /
public void saveProcessInstance ( ProcessInstance processInstance ) {
@ -1263,6 +1296,7 @@ public class ProcessService {
/ * *
* insert or update command
*
* @param command command
* @return save command result
* /
@ -1276,6 +1310,7 @@ public class ProcessService {
/ * *
* insert or update task instance
*
* @param taskInstance taskInstance
* @return save task instance result
* /
@ -1289,6 +1324,7 @@ public class ProcessService {
/ * *
* insert task instance
*
* @param taskInstance taskInstance
* @return create task instance result
* /
@ -1299,6 +1335,7 @@ public class ProcessService {
/ * *
* update task instance
*
* @param taskInstance taskInstance
* @return update task instance result
* /
@ -1306,8 +1343,10 @@ public class ProcessService {
int count = taskInstanceMapper . updateById ( taskInstance ) ;
return count > 0 ;
}
/ * *
* delete a command by id
*
* @param id id
* /
public void delCommandByid ( int id ) {
@ -1316,6 +1355,7 @@ public class ProcessService {
/ * *
* find task instance by id
*
* @param taskId task id
* @return task intance
* /
@ -1323,9 +1363,9 @@ public class ProcessService {
return taskInstanceMapper . selectById ( taskId ) ;
}
/ * *
* package task instance , associate processInstance and processDefine
*
* @param taskInstId taskInstId
* @return task instance
* /
@ -1345,9 +1385,9 @@ public class ProcessService {
return taskInstance ;
}
/ * *
* get id list by task state
*
* @param instanceId instanceId
* @param state state
* @return task instance states
@ -1358,6 +1398,7 @@ public class ProcessService {
/ * *
* find valid task list by process definition id
*
* @param processInstanceId processInstanceId
* @return task instance list
* /
@ -1367,6 +1408,7 @@ public class ProcessService {
/ * *
* find previous task list by work process id
*
* @param processInstanceId processInstanceId
* @return task instance list
* /
@ -1376,6 +1418,7 @@ public class ProcessService {
/ * *
* update work process instance map
*
* @param processInstanceMap processInstanceMap
* @return update process instance result
* /
@ -1383,9 +1426,9 @@ public class ProcessService {
return processInstanceMapMapper . updateById ( processInstanceMap ) ;
}
/ * *
* create work process instance map
*
* @param processInstanceMap processInstanceMap
* @return create process instance result
* /
@ -1399,6 +1442,7 @@ public class ProcessService {
/ * *
* find work process map by parent process id and parent task id .
*
* @param parentWorkProcessId parentWorkProcessId
* @param parentTaskId parentTaskId
* @return process instance map
@ -1409,6 +1453,7 @@ public class ProcessService {
/ * *
* delete work process map by parent process id
*
* @param parentWorkProcessId parentWorkProcessId
* @return delete process map result
* /
@ -1419,6 +1464,7 @@ public class ProcessService {
/ * *
* find sub process instance
*
* @param parentProcessId parentProcessId
* @param parentTaskId parentTaskId
* @return process instance
@ -1435,6 +1481,7 @@ public class ProcessService {
/ * *
* find parent process instance
*
* @param subProcessId subProcessId
* @return process instance
* /
@ -1448,9 +1495,9 @@ public class ProcessService {
return processInstance ;
}
/ * *
* change task state
*
* @param state state
* @param startTime startTime
* @param host host
@ -1473,6 +1520,7 @@ public class ProcessService {
/ * *
* update process instance
*
* @param processInstance processInstance
* @return update process instance result
* /
@ -1482,6 +1530,7 @@ public class ProcessService {
/ * *
* update the process instance
*
* @param processInstanceId processInstanceId
* @param processJson processJson
* @param globalParams globalParams
@ -1508,6 +1557,7 @@ public class ProcessService {
/ * *
* change task state
*
* @param state state
* @param endTime endTime
* @param taskInstId taskInstId
@ -1530,6 +1580,7 @@ public class ProcessService {
/ * *
* convert integer list to string list
*
* @param intList intList
* @return string list
* /
@ -1546,6 +1597,7 @@ public class ProcessService {
/ * *
* query schedule by id
*
* @param id id
* @return schedule
* /
@ -1555,6 +1607,7 @@ public class ProcessService {
/ * *
* query Schedule by processDefinitionId
*
* @param processDefinitionId processDefinitionId
* @see Schedule
* /
@ -1564,6 +1617,7 @@ public class ProcessService {
/ * *
* query need failover process instance
*
* @param host host
* @return process instance list
* /
@ -1574,6 +1628,7 @@ public class ProcessService {
/ * *
* process need failover process instance
*
* @param processInstance processInstance
* /
@Transactional ( rollbackFor = RuntimeException . class )
@ -1593,6 +1648,7 @@ public class ProcessService {
/ * *
* query all need failover task instances by host
*
* @param host host
* @return task instance list
* /
@ -1603,6 +1659,7 @@ public class ProcessService {
/ * *
* find data source by id
*
* @param id id
* @return datasource
* /
@ -1610,9 +1667,9 @@ public class ProcessService {
return dataSourceMapper . selectById ( id ) ;
}
/ * *
* update process instance state by id
*
* @param processInstanceId processInstanceId
* @param executionStatus executionStatus
* @return update process result
@ -1626,6 +1683,7 @@ public class ProcessService {
/ * *
* find process instance by the task id
*
* @param taskId taskId
* @return process instance
* /
@ -1639,6 +1697,7 @@ public class ProcessService {
/ * *
* find udf function list by id list string
*
* @param ids ids
* @return udf function list
* /
@ -1648,6 +1707,7 @@ public class ProcessService {
/ * *
* find tenant code by resource name
*
* @param resName resource name
* @param resourceType resource type
* @return tenant code
@ -1658,6 +1718,7 @@ public class ProcessService {
/ * *
* find schedule list by process define id .
*
* @param ids ids
* @return schedule list
* /
@ -1668,6 +1729,7 @@ public class ProcessService {
/ * *
* get dependency cycle by work process define id and scheduler fire time
*
* @param masterId masterId
* @param processDefinitionId processDefinitionId
* @param scheduledFireTime the time the task schedule is expected to trigger
@ -1682,6 +1744,7 @@ public class ProcessService {
/ * *
* get dependency cycle list by work process define id list and scheduler fire time
*
* @param masterId masterId
* @param ids ids
* @param scheduledFireTime the time the task schedule is expected to trigger
@ -1699,7 +1762,6 @@ public class ProcessService {
return cycleDependencyList ;
}
String strCrontab = "" ;
CronExpression depCronExpression ;
Cron depCron ;
@ -1754,6 +1816,7 @@ public class ProcessService {
/ * *
* find last scheduler process instance in the date interval
*
* @param definitionId definitionId
* @param dateInterval dateInterval
* @return process instance
@ -1766,6 +1829,7 @@ public class ProcessService {
/ * *
* find last manual process instance interval
*
* @param definitionId process definition id
* @param dateInterval dateInterval
* @return process instance
@ -1778,6 +1842,7 @@ public class ProcessService {
/ * *
* find last running process instance
*
* @param definitionId process definition id
* @param startTime start time
* @param endTime end time
@ -1792,6 +1857,7 @@ public class ProcessService {
/ * *
* query user queue by process instance id
*
* @param processInstanceId processInstanceId
* @return queue
* /
@ -1809,16 +1875,14 @@ public class ProcessService {
return queue ;
}
/ * *
* get task worker group
*
* @param taskInstance taskInstance
* @return workerGroupId
* /
public String getTaskWorkerGroup ( TaskInstance taskInstance ) {
String workerGroup = taskInstance . getWorkerGroup ( ) ;
if ( StringUtils . isNotBlank ( workerGroup ) ) {
return workerGroup ;
}
@ -1834,6 +1898,7 @@ public class ProcessService {
/ * *
* get have perm project list
*
* @param userId userId
* @return project list
* /
@ -1853,6 +1918,7 @@ public class ProcessService {
/ * *
* get have perm project ids
*
* @param userId userId
* @return project ids
* /
@ -1867,6 +1933,7 @@ public class ProcessService {
/ * *
* list unauthorized udf function
*
* @param userId user id
* @param needChecks data source id array
* @return unauthorized udf function list
@ -1898,6 +1965,8 @@ public class ProcessService {
Set < Integer > authorizedUdfs = udfFuncMapper . listAuthorizedUdfFunc ( userId , needChecks ) . stream ( ) . map ( t - > t . getId ( ) ) . collect ( toSet ( ) ) ;
originResSet . removeAll ( authorizedUdfs ) ;
break ;
default :
break ;
}
resultList . addAll ( originResSet ) ;
@ -1908,6 +1977,7 @@ public class ProcessService {
/ * *
* get user by user id
*
* @param userId user id
* @return User
* /
@ -1917,6 +1987,7 @@ public class ProcessService {
/ * *
* get resource by resoruce id
*
* @param resoruceId resource id
* @return Resource
* /
@ -1924,9 +1995,9 @@ public class ProcessService {
return resourceMapper . selectById ( resoruceId ) ;
}
/ * *
* list resources by ids
*
* @param resIds resIds
* @return resource list
* /
@ -1936,6 +2007,7 @@ public class ProcessService {
/ * *
* format task app id in task instance
*
* @param taskInstance
* @return
* /