Browse Source

[Feature-6471]Cache Process definition in master (#6485)

* feature-6471 Cache Process definition in master
3.0.0/version-upgrade
OS 3 years ago committed by GitHub
parent
commit
db04a5b04d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  2. 41
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
  3. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
  4. 10
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
  5. 4
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
  6. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  7. 28
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  8. 3
      dolphinscheduler-server/src/main/resources/master.properties
  9. 77
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  10. 1
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
  11. 48
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  12. 4
      sql/dolphinscheduler_h2.sql
  13. 4
      sql/dolphinscheduler_mysql.sql
  14. 9
      sql/dolphinscheduler_postgre.sql

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -283,13 +283,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
switch (executeType) { switch (executeType) {
case REPEAT_RUNNING: case REPEAT_RUNNING:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.REPEAT_RUNNING, startParams); result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams);
break; break;
case RECOVER_SUSPENDED_PROCESS: case RECOVER_SUSPENDED_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
break; break;
case START_FAILURE_TASK_PROCESS: case START_FAILURE_TASK_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.START_FAILURE_TASK_PROCESS, startParams); result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
break; break;
case STOP: case STOP:
if (processInstance.getState() == ExecutionStatus.READY_STOP) { if (processInstance.getState() == ExecutionStatus.READY_STOP) {
@ -409,10 +409,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @param loginUser login user * @param loginUser login user
* @param instanceId instance id * @param instanceId instance id
* @param processDefinitionCode process definition code * @param processDefinitionCode process definition code
* @param version
* @param commandType command type * @param commandType command type
* @return insert result code * @return insert result code
*/ */
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, CommandType commandType, String startParams) { private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
//To add startParams only when repeat running is needed //To add startParams only when repeat running is needed
@ -427,6 +428,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setProcessDefinitionCode(processDefinitionCode); command.setProcessDefinitionCode(processDefinitionCode);
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
command.setExecutorId(loginUser.getId()); command.setExecutorId(loginUser.getId());
command.setProcessDefinitionVersion(processVersion);
command.setProcessInstanceId(instanceId);
if (!processService.verifyIsNeedCreateCommand(command)) { if (!processService.verifyIsNeedCreateCommand(command)) {
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionCode); putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionCode);
@ -545,6 +548,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setWorkerGroup(workerGroup); command.setWorkerGroup(workerGroup);
command.setEnvironmentCode(environmentCode); command.setEnvironmentCode(environmentCode);
command.setDryRun(dryRun); command.setDryRun(dryRun);
ProcessDefinition processDefinition = processService.findProcessDefinitionByCode(processDefineCode);
if (processDefinition != null) {
command.setProcessDefinitionVersion(processDefinition.getVersion());
}
command.setProcessInstanceId(0);
Date start = null; Date start = null;
Date end = null; Date end = null;

41
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java

@ -132,6 +132,12 @@ public class Command {
@TableField("dry_run") @TableField("dry_run")
private int dryRun; private int dryRun;
@TableField("process_instance_id")
private int processInstanceId;
@TableField("process_definition_version")
private int processDefinitionVersion;
public Command() { public Command() {
this.taskDependType = TaskDependType.TASK_POST; this.taskDependType = TaskDependType.TASK_POST;
this.failureStrategy = FailureStrategy.CONTINUE; this.failureStrategy = FailureStrategy.CONTINUE;
@ -152,7 +158,10 @@ public class Command {
String workerGroup, String workerGroup,
Long environmentCode, Long environmentCode,
Priority processInstancePriority, Priority processInstancePriority,
int dryRun) { int dryRun,
int processInstanceId,
int processDefinitionVersion
) {
this.commandType = commandType; this.commandType = commandType;
this.executorId = executorId; this.executorId = executorId;
this.processDefinitionCode = processDefinitionCode; this.processDefinitionCode = processDefinitionCode;
@ -168,6 +177,8 @@ public class Command {
this.environmentCode = environmentCode; this.environmentCode = environmentCode;
this.processInstancePriority = processInstancePriority; this.processInstancePriority = processInstancePriority;
this.dryRun = dryRun; this.dryRun = dryRun;
this.processInstanceId = processInstanceId;
this.processDefinitionVersion = processDefinitionVersion;
} }
public TaskDependType getTaskDependType() { public TaskDependType getTaskDependType() {
@ -298,6 +309,22 @@ public class Command {
this.dryRun = dryRun; this.dryRun = dryRun;
} }
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public int getProcessDefinitionVersion() {
return processDefinitionVersion;
}
public void setProcessDefinitionVersion(int processDefinitionVersion) {
this.processDefinitionVersion = processDefinitionVersion;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o) {
@ -353,8 +380,13 @@ public class Command {
if (processInstancePriority != command.processInstancePriority) { if (processInstancePriority != command.processInstancePriority) {
return false; return false;
} }
if (processInstanceId != command.processInstanceId) {
return false;
}
if (processDefinitionVersion != command.getProcessDefinitionVersion()) {
return false;
}
return !(updateTime != null ? !updateTime.equals(command.updateTime) : command.updateTime != null); return !(updateTime != null ? !updateTime.equals(command.updateTime) : command.updateTime != null);
} }
@Override @Override
@ -375,6 +407,8 @@ public class Command {
result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0); result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0);
result = 31 * result + (environmentCode != null ? environmentCode.hashCode() : 0); result = 31 * result + (environmentCode != null ? environmentCode.hashCode() : 0);
result = 31 * result + dryRun; result = 31 * result + dryRun;
result = 31 * result + processInstanceId;
result = 31 * result + processDefinitionVersion;
return result; return result;
} }
@ -397,7 +431,10 @@ public class Command {
+ ", workerGroup='" + workerGroup + '\'' + ", workerGroup='" + workerGroup + '\''
+ ", environmentCode='" + environmentCode + '\'' + ", environmentCode='" + environmentCode + '\''
+ ", dryRun='" + dryRun + '\'' + ", dryRun='" + dryRun + '\''
+ ", processInstanceId='" + processInstanceId + '\''
+ ", processDefinitionVersion='" + processDefinitionVersion + '\''
+ '}'; + '}';
} }
} }

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java

@ -31,13 +31,6 @@ import java.util.List;
*/ */
public interface CommandMapper extends BaseMapper<Command> { public interface CommandMapper extends BaseMapper<Command> {
/**
* get one command
* @return command
*/
Command getOneToRun();
/** /**
* count command state * count command state
* @param userId userId * @param userId userId

10
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml

@ -18,16 +18,6 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.CommandMapper"> <mapper namespace="org.apache.dolphinscheduler.dao.mapper.CommandMapper">
<select id="getOneToRun" resultType="org.apache.dolphinscheduler.dao.entity.Command">
select cmd.id, cmd.command_type, cmd.process_definition_code, cmd.command_param, cmd.task_depend_type, cmd.failure_strategy,
cmd.warning_type, cmd.warning_group_id, cmd.schedule_time, cmd.start_time, cmd.executor_id, cmd.update_time,
cmd.process_instance_priority, cmd.worker_group, cmd.environment_code, cmd.dry_run
from t_ds_command cmd
join t_ds_process_definition definition on cmd.process_definition_code = definition.code
where definition.release_state = 1 AND definition.flag = 1
order by cmd.update_time asc
limit 1
</select>
<select id="countCommandState" resultType="org.apache.dolphinscheduler.dao.entity.CommandCount"> <select id="countCommandState" resultType="org.apache.dolphinscheduler.dao.entity.CommandCount">
select cmd.command_type as command_type, count(1) as count select cmd.command_type as command_type, count(1) as count
from t_ds_command cmd, t_ds_process_definition process from t_ds_command cmd, t_ds_process_definition process

4
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java

@ -147,7 +147,7 @@ public class CommandMapperTest {
createCommand(CommandType.START_PROCESS, processDefinition.getCode()); createCommand(CommandType.START_PROCESS, processDefinition.getCode());
Command actualCommand = commandMapper.getOneToRun(); List<Command> actualCommand = commandMapper.queryCommandPage(1,0);
assertNotNull(actualCommand); assertNotNull(actualCommand);
} }
@ -259,6 +259,8 @@ public class CommandMapperTest {
command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00")); command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00")); command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
command.setProcessInstanceId(0);
command.setProcessDefinitionVersion(0);
commandMapper.insert(command); commandMapper.insert(command);
return command; return command;

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -60,6 +60,9 @@ public class MasterConfig {
@Value("${master.reserved.memory:0.3}") @Value("${master.reserved.memory:0.3}")
private double masterReservedMemory; private double masterReservedMemory;
@Value("${master.cache.process.definition:true}")
private boolean masterCacheProcessDefinition;
public int getListenPort() { public int getListenPort() {
return listenPort; return listenPort;
} }
@ -150,4 +153,13 @@ public class MasterConfig {
public void setStateWheelInterval(int stateWheelInterval) { public void setStateWheelInterval(int stateWheelInterval) {
this.stateWheelInterval = stateWheelInterval; this.stateWheelInterval = stateWheelInterval;
} }
public boolean getMasterCacheProcessDefinition() {
return masterCacheProcessDefinition;
}
public void setMasterCacheProcessDefinition(boolean masterCacheProcessDefinition) {
this.masterCacheProcessDefinition = masterCacheProcessDefinition;
}
} }

28
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
@ -34,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -92,11 +94,26 @@ public class MasterSchedulerService extends Thread {
*/ */
private ThreadPoolExecutor masterExecService; private ThreadPoolExecutor masterExecService;
/**
* process instance execution list
*/
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps; private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
/**
* process timeout check list
*/
ConcurrentHashMap<Integer, ProcessInstance> processTimeoutCheckList = new ConcurrentHashMap<>(); ConcurrentHashMap<Integer, ProcessInstance> processTimeoutCheckList = new ConcurrentHashMap<>();
/**
* task time out checkout list
*/
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>(); ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
/**
* key:code-version
* value: processDefinition
*/
HashMap<String, ProcessDefinition> processDefinitionCacheMaps = new HashMap<>();
private StateWheelExecuteThread stateWheelExecuteThread; private StateWheelExecuteThread stateWheelExecuteThread;
/** /**
@ -112,7 +129,6 @@ public class MasterSchedulerService extends Thread {
taskTimeoutCheckList, taskTimeoutCheckList,
this.processInstanceExecMaps, this.processInstanceExecMaps,
masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
} }
@Override @Override
@ -165,7 +181,6 @@ public class MasterSchedulerService extends Thread {
*/ */
private void scheduleProcess() throws Exception { private void scheduleProcess() throws Exception {
int activeCount = masterExecService.getActiveCount();
// make sure to scan and delete command table in one transaction // make sure to scan and delete command table in one transaction
Command command = findOneCommand(); Command command = findOneCommand();
if (command != null) { if (command != null) {
@ -173,7 +188,12 @@ public class MasterSchedulerService extends Thread {
try { try {
ProcessInstance processInstance = processService.handleCommand(logger, ProcessInstance processInstance = processService.handleCommand(logger,
getLocalAddress(), getLocalAddress(),
this.masterConfig.getMasterExecThreads() - activeCount, command); command,
processDefinitionCacheMaps);
if (!masterConfig.getMasterCacheProcessDefinition()
&& processDefinitionCacheMaps.size() > 0) {
processDefinitionCacheMaps.clear();
}
if (processInstance != null) { if (processInstance != null) {
WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread( WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
processInstance processInstance

3
dolphinscheduler-server/src/main/resources/master.properties

@ -39,6 +39,9 @@
# master commit task interval, the unit is millisecond # master commit task interval, the unit is millisecond
#master.task.commit.interval=1000 #master.task.commit.interval=1000
# master cache process definition, default: true
#master.cache.process.definition=true
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
#master.max.cpuload.avg=-1 #master.max.cpuload.avg=-1

77
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -208,13 +208,12 @@ public class ProcessService {
* *
* @param logger logger * @param logger logger
* @param host host * @param host host
* @param validThreadNum validThreadNum
* @param command found command * @param command found command
* @param processDefinitionCacheMaps
* @return process instance * @return process instance
*/ */
@Transactional(rollbackFor = Exception.class) public ProcessInstance handleCommand(Logger logger, String host, Command command, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) { ProcessInstance processInstance = constructProcessInstance(command, host, processDefinitionCacheMaps);
ProcessInstance processInstance = constructProcessInstance(command, host);
// cannot construct process instance, return null // cannot construct process instance, return null
if (processInstance == null) { if (processInstance == null) {
logger.error("scan command, command parameter is error: {}", command); logger.error("scan command, command parameter is error: {}", command);
@ -235,7 +234,6 @@ public class ProcessService {
* @param command command * @param command command
* @param message message * @param message message
*/ */
@Transactional(rollbackFor = Exception.class)
public void moveToErrorCommand(Command command, String message) { public void moveToErrorCommand(Command command, String message) {
ErrorCommand errorCommand = new ErrorCommand(command, message); ErrorCommand errorCommand = new ErrorCommand(command, message);
this.errorCommandMapper.insert(errorCommand); this.errorCommandMapper.insert(errorCommand);
@ -286,15 +284,6 @@ public class ProcessService {
return result; return result;
} }
/**
* find one command from queue list
*
* @return command
*/
public Command findOneCommand() {
return commandMapper.getOneToRun();
}
/** /**
* get command page * get command page
* *
@ -547,7 +536,9 @@ public class ProcessService {
processInstance.getWorkerGroup(), processInstance.getWorkerGroup(),
processInstance.getEnvironmentCode(), processInstance.getEnvironmentCode(),
processInstance.getProcessInstancePriority(), processInstance.getProcessInstancePriority(),
processInstance.getDryRun() processInstance.getDryRun(),
processInstance.getId(),
processInstance.getProcessDefinitionVersion()
); );
saveCommand(command); saveCommand(command);
return; return;
@ -746,39 +737,28 @@ public class ProcessService {
* *
* @param command command * @param command command
* @param host host * @param host host
* @param processDefinitionCacheMaps
* @return process instance * @return process instance
*/ */
private ProcessInstance constructProcessInstance(Command command, String host) { private ProcessInstance constructProcessInstance(Command command, String host, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
ProcessInstance processInstance; ProcessInstance processInstance;
ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType(); CommandType commandType = command.getCommandType();
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam()); String key = String.format("%d-%d", command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
if (processDefinitionCacheMaps.containsKey(key)) {
ProcessDefinition processDefinition = getProcessDefinitionByCommand(command.getProcessDefinitionCode(), cmdParam); processDefinition = processDefinitionCacheMaps.get(key);
} else {
processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
if (processDefinition != null) {
processDefinitionCacheMaps.put(key, processDefinition);
}
}
if (processDefinition == null) { if (processDefinition == null) {
logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode()); logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
return null; return null;
} }
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
if (cmdParam != null) { int processInstanceId = command.getProcessInstanceId();
int processInstanceId = 0;
// recover from failure or pause tasks
if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
String processId = cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING);
processInstanceId = Integer.parseInt(processId);
if (processInstanceId == 0) {
logger.error("command parameter is error, [ ProcessInstanceId ] is 0");
return null;
}
} else if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
// sub process map
String pId = cmdParam.get(Constants.CMD_PARAM_SUB_PROCESS);
processInstanceId = Integer.parseInt(pId);
} else if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD)) {
// waiting thread command
String pId = cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD);
processInstanceId = Integer.parseInt(pId);
}
if (processInstanceId == 0) { if (processInstanceId == 0) {
processInstance = generateNewProcessInstance(processDefinition, command, cmdParam); processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
} else { } else {
@ -786,8 +766,9 @@ public class ProcessService {
if (processInstance == null) { if (processInstance == null) {
return processInstance; return processInstance;
} }
}
if (cmdParam != null) {
CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command); CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command);
// reset global params while repeat running is needed by cmdParam // reset global params while repeat running is needed by cmdParam
if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) { if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
setGlobalParamIfCommanded(processDefinition, cmdParam); setGlobalParamIfCommanded(processDefinition, cmdParam);
@ -814,20 +795,14 @@ public class ProcessService {
if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) { if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
processInstance.setCommandParam(command.getCommandParam()); processInstance.setCommandParam(command.getCommandParam());
} }
} else {
// generate one new process instance
processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
}
if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) { if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
logger.error("command parameter check failed!"); logger.error("command parameter check failed!");
return null; return null;
} }
if (command.getScheduleTime() != null) { if (command.getScheduleTime() != null) {
processInstance.setScheduleTime(command.getScheduleTime()); processInstance.setScheduleTime(command.getScheduleTime());
} }
processInstance.setHost(host); processInstance.setHost(host);
ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXECUTION; ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXECUTION;
int runTime = processInstance.getRunTimes(); int runTime = processInstance.getRunTimes();
switch (commandType) { switch (commandType) {
@ -1274,7 +1249,7 @@ public class ProcessService {
} }
} }
String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams); String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
int subProcessInstanceId = childInstance == null ? 0 : childInstance.getId();
return new Command( return new Command(
commandType, commandType,
TaskDependType.TASK_POST, TaskDependType.TASK_POST,
@ -1288,7 +1263,9 @@ public class ProcessService {
task.getWorkerGroup(), task.getWorkerGroup(),
task.getEnvironmentCode(), task.getEnvironmentCode(),
parentProcessInstance.getProcessInstancePriority(), parentProcessInstance.getProcessInstancePriority(),
parentProcessInstance.getDryRun() parentProcessInstance.getDryRun(),
subProcessInstanceId,
parentProcessInstance.getProcessDefinitionVersion()
); );
} }
@ -1886,6 +1863,8 @@ public class ProcessService {
//2 insert into recover command //2 insert into recover command
Command cmd = new Command(); Command cmd = new Command();
cmd.setProcessDefinitionCode(processDefinition.getCode()); cmd.setProcessDefinitionCode(processDefinition.getCode());
cmd.setProcessDefinitionVersion(processDefinition.getVersion());
cmd.setProcessInstanceId(processInstance.getId());
cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId())); cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
cmd.setExecutorId(processInstance.getExecutorId()); cmd.setExecutorId(processInstance.getExecutorId());
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS); cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);

1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java

@ -101,6 +101,7 @@ public class ProcessScheduleJob implements Job {
command.setWorkerGroup(workerGroup); command.setWorkerGroup(workerGroup);
command.setWarningType(schedule.getWarningType()); command.setWarningType(schedule.getWarningType());
command.setProcessInstancePriority(schedule.getProcessInstancePriority()); command.setProcessInstancePriority(schedule.getProcessInstancePriority());
command.setProcessDefinitionVersion(processDefinition.getVersion());
getProcessService().createCommand(command); getProcessService().createCommand(command);
} }

48
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -82,7 +82,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
/** /**
* process service test * process service test
@ -119,6 +118,8 @@ public class ProcessServiceTest {
@Mock @Mock
private ResourceMapper resourceMapper; private ResourceMapper resourceMapper;
private HashMap<String, ProcessDefinition> processDefinitionCacheMaps = new HashMap<>();
@Test @Test
public void testCreateSubCommand() { public void testCreateSubCommand() {
ProcessInstance parentInstance = new ProcessInstance(); ProcessInstance parentInstance = new ProcessInstance();
@ -240,56 +241,65 @@ public class ProcessServiceTest {
//cannot construct process instance, return null; //cannot construct process instance, return null;
String host = "127.0.0.1"; String host = "127.0.0.1";
int validThreadNum = 1;
Command command = new Command(); Command command = new Command();
command.setProcessDefinitionCode(222); command.setProcessDefinitionCode(222);
command.setCommandType(CommandType.REPEAT_RUNNING); command.setCommandType(CommandType.REPEAT_RUNNING);
command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\"" command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\""
+ CMD_PARAM_SUB_PROCESS_DEFINE_ID + "\":\"222\"}"); + CMD_PARAM_SUB_PROCESS_DEFINE_ID + "\":\"222\"}");
Assert.assertNull(processService.handleCommand(logger, host, validThreadNum, command)); Assert.assertNull(processService.handleCommand(logger, host, command, processDefinitionCacheMaps));
int definitionVersion = 1;
long definitionCode = 123;
int processInstanceId = 222;
//there is not enough thread for this command //there is not enough thread for this command
Command command1 = new Command(); Command command1 = new Command();
command1.setProcessDefinitionCode(123); command1.setProcessDefinitionCode(definitionCode);
command1.setProcessDefinitionVersion(definitionVersion);
command1.setCommandParam("{\"ProcessInstanceId\":222}"); command1.setCommandParam("{\"ProcessInstanceId\":222}");
command1.setCommandType(CommandType.START_PROCESS); command1.setCommandType(CommandType.START_PROCESS);
ProcessDefinition processDefinition = new ProcessDefinition(); ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(123); processDefinition.setId(123);
processDefinition.setName("test"); processDefinition.setName("test");
processDefinition.setVersion(1); processDefinition.setVersion(definitionVersion);
processDefinition.setCode(11L); processDefinition.setCode(definitionCode);
processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]"); processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
ProcessInstance processInstance = new ProcessInstance(); ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(222); processInstance.setId(processInstanceId);
processInstance.setProcessDefinitionCode(11L); processInstance.setProcessDefinitionCode(definitionCode);
processInstance.setProcessDefinitionVersion(1); processInstance.setProcessDefinitionVersion(definitionVersion);
Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode())).thenReturn(processDefinition);
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command1)); Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps));
Command command2 = new Command(); Command command2 = new Command();
command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}"); command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
command2.setProcessDefinitionCode(123); command2.setProcessDefinitionCode(definitionCode);
command2.setProcessDefinitionVersion(definitionVersion);
command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS); command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
command2.setProcessInstanceId(processInstanceId);
Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command2)); Assert.assertNotNull(processService.handleCommand(logger, host, command2, processDefinitionCacheMaps));
Command command3 = new Command(); Command command3 = new Command();
command3.setProcessDefinitionCode(123); command3.setProcessDefinitionCode(definitionCode);
command3.setProcessDefinitionVersion(definitionVersion);
command3.setProcessInstanceId(processInstanceId);
command3.setCommandParam("{\"WaitingThreadInstanceId\":222}"); command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command3)); Assert.assertNotNull(processService.handleCommand(logger, host, command3, processDefinitionCacheMaps));
Command command4 = new Command(); Command command4 = new Command();
command4.setProcessDefinitionCode(123); command4.setProcessDefinitionCode(definitionCode);
command4.setProcessDefinitionVersion(definitionVersion);
command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}"); command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
command4.setCommandType(CommandType.REPEAT_RUNNING); command4.setCommandType(CommandType.REPEAT_RUNNING);
Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command4)); command4.setProcessInstanceId(processInstanceId);
Assert.assertNotNull(processService.handleCommand(logger, host, command4, processDefinitionCacheMaps));
Command command5 = new Command(); Command command5 = new Command();
command5.setProcessDefinitionCode(123); command5.setProcessDefinitionCode(definitionCode);
command5.setProcessDefinitionVersion(definitionVersion);
HashMap<String, String> startParams = new HashMap<>(); HashMap<String, String> startParams = new HashMap<>();
startParams.put("startParam1", "testStartParam1"); startParams.put("startParam1", "testStartParam1");
HashMap<String, String> commandParams = new HashMap<>(); HashMap<String, String> commandParams = new HashMap<>();
@ -297,7 +307,7 @@ public class ProcessServiceTest {
command5.setCommandParam(JSONUtils.toJsonString(commandParams)); command5.setCommandParam(JSONUtils.toJsonString(commandParams));
command5.setCommandType(CommandType.START_PROCESS); command5.setCommandType(CommandType.START_PROCESS);
command5.setDryRun(Constants.DRY_RUN_FLAG_NO); command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
ProcessInstance processInstance1 = processService.handleCommand(logger, host, validThreadNum, command5); ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5, processDefinitionCacheMaps);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\"")); Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
} }

4
sql/dolphinscheduler_h2.sql

@ -328,6 +328,8 @@ CREATE TABLE t_ds_command
worker_group varchar(64), worker_group varchar(64),
environment_code bigint(20) DEFAULT '-1', environment_code bigint(20) DEFAULT '-1',
dry_run int NULL DEFAULT 0, dry_run int NULL DEFAULT 0,
process_instance_id int(11) DEFAULT 0,
process_definition_version int(11) DEFAULT 0,
PRIMARY KEY (id), PRIMARY KEY (id),
KEY priority_id_index (process_instance_priority, id) KEY priority_id_index (process_instance_priority, id)
); );
@ -381,6 +383,8 @@ CREATE TABLE t_ds_error_command
environment_code bigint(20) DEFAULT '-1', environment_code bigint(20) DEFAULT '-1',
message text, message text,
dry_run int NULL DEFAULT 0, dry_run int NULL DEFAULT 0,
process_instance_id int(11) DEFAULT 0,
process_definition_version int(11) DEFAULT 0,
PRIMARY KEY (id) PRIMARY KEY (id)
); );

4
sql/dolphinscheduler_mysql.sql

@ -333,6 +333,8 @@ CREATE TABLE `t_ds_command` (
`worker_group` varchar(64) COMMENT 'worker group', `worker_group` varchar(64) COMMENT 'worker group',
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code', `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`dry_run` int NULL DEFAULT 0 COMMENT 'dry run flag:0 normal, 1 dry run', `dry_run` int NULL DEFAULT 0 COMMENT 'dry run flag:0 normal, 1 dry run',
`process_instance_id` int(11) DEFAULT 0 COMMENT 'process instance id',
`process_definition_version` int(11) DEFAULT 0 COMMENT 'process definition version',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `priority_id_index` (`process_instance_priority`,`id`) USING BTREE KEY `priority_id_index` (`process_instance_priority`,`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
@ -384,6 +386,8 @@ CREATE TABLE `t_ds_error_command` (
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code', `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`message` text COMMENT 'message', `message` text COMMENT 'message',
`dry_run` int NULL DEFAULT NULL COMMENT 'dry run flag: 0 normal, 1 dry run', `dry_run` int NULL DEFAULT NULL COMMENT 'dry run flag: 0 normal, 1 dry run',
`process_instance_id` int(11) DEFAULT 0 COMMENT 'process instance id: 0',
`process_definition_version` int(11) DEFAULT 0 COMMENT 'process definition version',
PRIMARY KEY (`id`) USING BTREE PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC; ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

9
sql/dolphinscheduler_postgre.sql

@ -255,6 +255,8 @@ CREATE TABLE t_ds_command (
worker_group varchar(64), worker_group varchar(64),
environment_code bigint DEFAULT '-1', environment_code bigint DEFAULT '-1',
dry_run int DEFAULT '0' , dry_run int DEFAULT '0' ,
process_instance_id int DEFAULT 0,
process_definition_version int DEFAULT 0,
PRIMARY KEY (id) PRIMARY KEY (id)
) ; ) ;
@ -286,7 +288,6 @@ DROP TABLE IF EXISTS t_ds_error_command;
CREATE TABLE t_ds_error_command ( CREATE TABLE t_ds_error_command (
id int NOT NULL , id int NOT NULL ,
command_type int DEFAULT NULL , command_type int DEFAULT NULL ,
executor_id int DEFAULT NULL ,
process_definition_code bigint NOT NULL , process_definition_code bigint NOT NULL ,
command_param text , command_param text ,
task_depend_type int DEFAULT NULL , task_depend_type int DEFAULT NULL ,
@ -295,12 +296,14 @@ CREATE TABLE t_ds_error_command (
warning_group_id int DEFAULT NULL , warning_group_id int DEFAULT NULL ,
schedule_time timestamp DEFAULT NULL , schedule_time timestamp DEFAULT NULL ,
start_time timestamp DEFAULT NULL , start_time timestamp DEFAULT NULL ,
executor_id int DEFAULT NULL ,
update_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL ,
process_instance_priority int DEFAULT NULL , process_instance_priority int DEFAULT NULL ,
worker_group varchar(64), worker_group varchar(64),
environment_code bigint DEFAULT '-1', environment_code bigint DEFAULT '-1',
message text , dry_run int DEFAULT '0' ,
dry_ru int DEFAULT '0' , process_instance_id int DEFAULT 0,
process_definition_version int DEFAULT 0,
PRIMARY KEY (id) PRIMARY KEY (id)
); );
-- --

Loading…
Cancel
Save