Browse Source

[Fix-5519] The t_ds_command table and t_ds_error_command table, process_definition_id change to process_definition_code (#5937)

* fix: the t_ds_schedules table, process_definition_id change to process_definition_code

* fix checkstyle

* fix: recovery code

* fix UT

* fix: The t_ds_command table and t_ds_error_command table, process_definition_id change to process_definition_code

* fix comment

* fix checkstyle

* fix: remove duplacated lines

* fix: remove TODO

Co-authored-by: wen-hemin <wenhemin@apache.com>
2.0.7-release
wen-hemin 3 years ago committed by GitHub
parent
commit
92b92b5f5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  2. 71
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
  3. 99
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
  4. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
  5. 4
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml
  6. 73
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
  7. 11
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java
  8. 22
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java
  9. 72
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  10. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
  11. 60
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  12. 4
      sql/dolphinscheduler_mysql.sql
  13. 4
      sql/dolphinscheduler_postgre.sql

26
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -160,7 +160,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/**
* create command
*/
int create = this.createCommand(commandType, processDefinition.getId(),
int create = this.createCommand(commandType, processDefinition.getCode(),
taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
warningGroupId, runMode, processInstancePriority, workerGroup, startParams);
@ -276,13 +276,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
switch (executeType) {
case REPEAT_RUNNING:
result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING, startParams);
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.REPEAT_RUNNING, startParams);
break;
case RECOVER_SUSPENDED_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
break;
case START_FAILURE_TASK_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
break;
case STOP:
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
@ -394,11 +394,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
*
* @param loginUser login user
* @param instanceId instance id
* @param processDefinitionId process definition id
* @param processDefinitionCode process definition code
* @param commandType command type
* @return insert result code
*/
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType, String startParams) {
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, CommandType commandType, String startParams) {
Map<String, Object> result = new HashMap<>();
//To add startParams only when repeat running is needed
@ -410,12 +410,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Command command = new Command();
command.setCommandType(commandType);
command.setProcessDefinitionId(processDefinitionId);
command.setProcessDefinitionCode(processDefinitionCode);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
command.setExecutorId(loginUser.getId());
if (!processService.verifyIsNeedCreateCommand(command)) {
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionId);
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionCode);
return result;
}
@ -475,7 +475,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* create command
*
* @param commandType commandType
* @param processDefineId processDefineId
* @param processDefineCode processDefineCode
* @param nodeDep nodeDep
* @param failureStrategy failureStrategy
* @param startNodeList startNodeList
@ -488,7 +488,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @param workerGroup workerGroup
* @return command id
*/
private int createCommand(CommandType commandType, int processDefineId,
private int createCommand(CommandType commandType, long processDefineCode,
TaskDependType nodeDep, FailureStrategy failureStrategy,
String startNodeList, String schedule, WarningType warningType,
int executorId, int warningGroupId,
@ -506,7 +506,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
} else {
command.setCommandType(commandType);
}
command.setProcessDefinitionId(processDefineId);
command.setProcessDefinitionCode(processDefineCode);
if (nodeDep != null) {
command.setTaskDependType(nodeDep);
}
@ -549,7 +549,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
} else if (runMode == RunMode.RUN_MODE_PARALLEL) {
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineId); // TODO: next pr change to code
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineCode);
List<Date> listDate = new LinkedList<>();
if (!CollectionUtils.isEmpty(schedules)) {
for (Schedule item : schedules) {
@ -580,7 +580,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
}
} else {
logger.error("there is not valid schedule date for the process definition: id:{}", processDefineId);
logger.error("there is not valid schedule date for the process definition code:{}", processDefineCode);
}
} else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));

71
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java

@ -14,15 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import java.util.Date;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import org.apache.dolphinscheduler.common.enums.*;
import java.util.Date;
/**
* command
@ -33,7 +39,7 @@ public class Command {
/**
* id
*/
@TableId(value="id", type=IdType.AUTO)
@TableId(value = "id", type = IdType.AUTO)
private int id;
/**
@ -43,10 +49,10 @@ public class Command {
private CommandType commandType;
/**
* process definition id
* process definition code
*/
@TableField("process_definition_id")
private int processDefinitionId;
@TableField("process_definition_code")
private long processDefinitionCode;
/**
* executor id
@ -126,7 +132,7 @@ public class Command {
TaskDependType taskDependType,
FailureStrategy failureStrategy,
int executorId,
int processDefinitionId,
long processDefinitionCode,
String commandParam,
WarningType warningType,
int warningGroupId,
@ -135,7 +141,7 @@ public class Command {
Priority processInstancePriority) {
this.commandType = commandType;
this.executorId = executorId;
this.processDefinitionId = processDefinitionId;
this.processDefinitionCode = processDefinitionCode;
this.commandParam = commandParam;
this.warningType = warningType;
this.warningGroupId = warningGroupId;
@ -148,7 +154,6 @@ public class Command {
this.processInstancePriority = processInstancePriority;
}
public TaskDependType getTaskDependType() {
return taskDependType;
}
@ -173,15 +178,14 @@ public class Command {
this.commandType = commandType;
}
public int getProcessDefinitionId() {
return processDefinitionId;
public long getProcessDefinitionCode() {
return processDefinitionCode;
}
public void setProcessDefinitionId(int processDefinitionId) {
this.processDefinitionId = processDefinitionId;
public void setProcessDefinitionCode(long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
}
public FailureStrategy getFailureStrategy() {
return failureStrategy;
}
@ -276,7 +280,7 @@ public class Command {
if (id != command.id) {
return false;
}
if (processDefinitionId != command.processDefinitionId) {
if (processDefinitionCode != command.processDefinitionCode) {
return false;
}
if (executorId != command.executorId) {
@ -320,7 +324,7 @@ public class Command {
public int hashCode() {
int result = id;
result = 31 * result + (commandType != null ? commandType.hashCode() : 0);
result = 31 * result + processDefinitionId;
result = 31 * result + Long.hashCode(processDefinitionCode);
result = 31 * result + executorId;
result = 31 * result + (commandParam != null ? commandParam.hashCode() : 0);
result = 31 * result + (taskDependType != null ? taskDependType.hashCode() : 0);
@ -334,24 +338,25 @@ public class Command {
result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "Command{" +
"id=" + id +
", commandType=" + commandType +
", processDefinitionId=" + processDefinitionId +
", executorId=" + executorId +
", commandParam='" + commandParam + '\'' +
", taskDependType=" + taskDependType +
", failureStrategy=" + failureStrategy +
", warningType=" + warningType +
", warningGroupId=" + warningGroupId +
", scheduleTime=" + scheduleTime +
", startTime=" + startTime +
", processInstancePriority=" + processInstancePriority +
", updateTime=" + updateTime +
", workerGroup='" + workerGroup + '\'' +
'}';
return "Command{"
+ "id=" + id
+ ", commandType=" + commandType
+ ", processDefinitionCode=" + processDefinitionCode
+ ", executorId=" + executorId
+ ", commandParam='" + commandParam + '\''
+ ", taskDependType=" + taskDependType
+ ", failureStrategy=" + failureStrategy
+ ", warningType=" + warningType
+ ", warningGroupId=" + warningGroupId
+ ", scheduleTime=" + scheduleTime
+ ", startTime=" + startTime
+ ", processInstancePriority=" + processInstancePriority
+ ", updateTime=" + updateTime
+ ", workerGroup='" + workerGroup + '\''
+ '}';
}
}

99
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java

@ -14,15 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import java.util.Date;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.apache.dolphinscheduler.common.enums.*;
import java.util.Date;
/**
* command
@ -33,7 +39,7 @@ public class ErrorCommand {
/**
* id
*/
@TableId(value="id", type = IdType.INPUT)
@TableId(value = "id", type = IdType.INPUT)
private int id;
/**
@ -42,9 +48,9 @@ public class ErrorCommand {
private CommandType commandType;
/**
* process definition id
* process definition code
*/
private int processDefinitionId;
private long processDefinitionCode;
/**
* executor id
@ -79,13 +85,13 @@ public class ErrorCommand {
/**
* schedule time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private Date scheduleTime;
/**
* start time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private Date startTime;
/**
@ -96,7 +102,7 @@ public class ErrorCommand {
/**
* update time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private Date updateTime;
/**
@ -111,11 +117,11 @@ public class ErrorCommand {
public ErrorCommand(){}
public ErrorCommand(Command command, String message){
public ErrorCommand(Command command, String message) {
this.id = command.getId();
this.commandType = command.getCommandType();
this.executorId = command.getExecutorId();
this.processDefinitionId = command.getProcessDefinitionId();
this.processDefinitionCode = command.getProcessDefinitionCode();
this.commandParam = command.getCommandParam();
this.warningType = command.getWarningType();
this.warningGroupId = command.getWarningGroupId();
@ -128,34 +134,6 @@ public class ErrorCommand {
this.message = message;
}
public ErrorCommand(
CommandType commandType,
TaskDependType taskDependType,
FailureStrategy failureStrategy,
int executorId,
int processDefinitionId,
String commandParam,
WarningType warningType,
int warningGroupId,
Date scheduleTime,
Priority processInstancePriority,
String message){
this.commandType = commandType;
this.executorId = executorId;
this.processDefinitionId = processDefinitionId;
this.commandParam = commandParam;
this.warningType = warningType;
this.warningGroupId = warningGroupId;
this.scheduleTime = scheduleTime;
this.taskDependType = taskDependType;
this.failureStrategy = failureStrategy;
this.startTime = new Date();
this.updateTime = new Date();
this.processInstancePriority = processInstancePriority;
this.message = message;
}
public TaskDependType getTaskDependType() {
return taskDependType;
}
@ -180,15 +158,14 @@ public class ErrorCommand {
this.commandType = commandType;
}
public int getProcessDefinitionId() {
return processDefinitionId;
public long getProcessDefinitionCode() {
return processDefinitionCode;
}
public void setProcessDefinitionId(int processDefinitionId) {
this.processDefinitionId = processDefinitionId;
public void setProcessDefinitionCode(long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
}
public FailureStrategy getFailureStrategy() {
return failureStrategy;
}
@ -279,22 +256,22 @@ public class ErrorCommand {
@Override
public String toString() {
return "ErrorCommand{" +
"id=" + id +
", commandType=" + commandType +
", processDefinitionId=" + processDefinitionId +
", executorId=" + executorId +
", commandParam='" + commandParam + '\'' +
", taskDependType=" + taskDependType +
", failureStrategy=" + failureStrategy +
", warningType=" + warningType +
", warningGroupId=" + warningGroupId +
", scheduleTime=" + scheduleTime +
", startTime=" + startTime +
", processInstancePriority=" + processInstancePriority +
", updateTime=" + updateTime +
", message='" + message + '\'' +
", workerGroup='" + workerGroup + '\'' +
'}';
return "ErrorCommand{"
+ "id=" + id
+ ", commandType=" + commandType
+ ", processDefinitionCode=" + processDefinitionCode
+ ", executorId=" + executorId
+ ", commandParam='" + commandParam + '\''
+ ", taskDependType=" + taskDependType
+ ", failureStrategy=" + failureStrategy
+ ", warningType=" + warningType
+ ", warningGroupId=" + warningGroupId
+ ", scheduleTime=" + scheduleTime
+ ", startTime=" + startTime
+ ", processInstancePriority=" + processInstancePriority
+ ", updateTime=" + updateTime
+ ", message='" + message + '\''
+ ", workerGroup='" + workerGroup + '\''
+ '}';
}
}

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml

@ -19,11 +19,11 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.CommandMapper">
<select id="getOneToRun" resultType="org.apache.dolphinscheduler.dao.entity.Command">
select cmd.id, cmd.command_type, cmd.process_definition_id, cmd.command_param, cmd.task_depend_type, cmd.failure_strategy,
select cmd.id, cmd.command_type, cmd.process_definition_code, cmd.command_param, cmd.task_depend_type, cmd.failure_strategy,
cmd.warning_type, cmd.warning_group_id, cmd.schedule_time, cmd.start_time, cmd.executor_id, cmd.update_time,
cmd.process_instance_priority, cmd.worker_group
from t_ds_command cmd
join t_ds_process_definition definition on cmd.process_definition_id = definition.id
join t_ds_process_definition definition on cmd.process_definition_code = definition.code
where definition.release_state = 1 AND definition.flag = 1
order by cmd.update_time asc
limit 1
@ -31,7 +31,7 @@
<select id="countCommandState" resultType="org.apache.dolphinscheduler.dao.entity.CommandCount">
select cmd.command_type as command_type, count(1) as count
from t_ds_command cmd, t_ds_process_definition process
where cmd.process_definition_id = process.id
where cmd.process_definition_code = process.code
<if test="projectCodeArray != null and projectCodeArray.length != 0">
and process.project_code in
<foreach collection="projectCodeArray" index="index" item="i" open="(" close=")" separator=",">

4
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml

@ -21,7 +21,7 @@
<select id="countCommandState" resultType="org.apache.dolphinscheduler.dao.entity.CommandCount">
select cmd.command_type as command_type, count(1) as count
from t_ds_error_command cmd, t_ds_process_definition process
where cmd.process_definition_id = process.id
where cmd.process_definition_code = process.code
<if test="projectCodeArray != null and projectCodeArray.length != 0">
and process.project_code in
<foreach collection="projectCodeArray" index="index" item="i" open="(" close=")" separator=",">
@ -33,4 +33,4 @@
</if>
group by cmd.command_type
</select>
</mapper>
</mapper>

73
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java

@ -14,15 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.CommandCount;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.common.enums.*;
import org.junit.Assert;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
@ -31,14 +50,6 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
/**
* command mapper test
*/
@ -48,24 +59,21 @@ import static org.junit.Assert.*;
@Rollback(true)
public class CommandMapperTest {
@Autowired
CommandMapper commandMapper;
@Autowired
ProcessDefinitionMapper processDefinitionMapper;
/**
* test insert
*/
@Test
public void testInsert(){
public void testInsert() {
Command command = createCommand();
assertThat(command.getId(),greaterThan(0));
}
/**
* test select by id
*/
@ -76,14 +84,14 @@ public class CommandMapperTest {
Command actualCommand = commandMapper.selectById(expectedCommand.getId());
assertNotNull(actualCommand);
assertEquals(expectedCommand.getProcessDefinitionId(), actualCommand.getProcessDefinitionId());
assertEquals(expectedCommand.getProcessDefinitionCode(), actualCommand.getProcessDefinitionCode());
}
/**
* test update
*/
@Test
public void testUpdate(){
public void testUpdate() {
Command expectedCommand = createCommand();
@ -103,7 +111,7 @@ public class CommandMapperTest {
* test delete
*/
@Test
public void testDelete(){
public void testDelete() {
Command expectedCommand = createCommand();
commandMapper.deleteById(expectedCommand.getId());
@ -124,7 +132,6 @@ public class CommandMapperTest {
Map<Integer, Command> commandMap = createCommandMap(count);
List<Command> actualCommands = commandMapper.selectList(null);
assertThat(actualCommands.size(), greaterThanOrEqualTo(count));
@ -138,7 +145,7 @@ public class CommandMapperTest {
ProcessDefinition processDefinition = createProcessDefinition();
Command expectedCommand = createCommand(CommandType.START_PROCESS,processDefinition.getId());
createCommand(CommandType.START_PROCESS, processDefinition.getCode());
Command actualCommand = commandMapper.getOneToRun();
@ -154,7 +161,7 @@ public class CommandMapperTest {
ProcessDefinition processDefinition = createProcessDefinition();
CommandCount expectedCommandCount = createCommandMap(count, CommandType.START_PROCESS, processDefinition.getId());
createCommandMap(count, CommandType.START_PROCESS, processDefinition.getCode());
Long[] projectCodeArray = {processDefinition.getProjectCode()};
@ -167,23 +174,22 @@ public class CommandMapperTest {
assertThat(actualCommandCounts.size(),greaterThanOrEqualTo(1));
}
/**
* create command map
* @param count map count
* @param commandType comman type
* @param processDefinitionId process definition id
* @param processDefinitionCode process definition code
* @return command map
*/
private CommandCount createCommandMap(
Integer count,
CommandType commandType,
Integer processDefinitionId){
long processDefinitionCode) {
CommandCount commandCount = new CommandCount();
for (int i = 0 ;i < count ;i++){
createCommand(commandType,processDefinitionId);
for (int i = 0;i < count;i++) {
createCommand(commandType, processDefinitionCode);
}
commandCount.setCommandType(commandType);
commandCount.setCount(count);
@ -195,7 +201,7 @@ public class CommandMapperTest {
* create process definition
* @return process definition
*/
private ProcessDefinition createProcessDefinition(){
private ProcessDefinition createProcessDefinition() {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
processDefinition.setReleaseState(ReleaseState.ONLINE);
@ -215,22 +221,21 @@ public class CommandMapperTest {
* @param count map count
* @return command map
*/
private Map<Integer,Command> createCommandMap(Integer count){
private Map<Integer,Command> createCommandMap(Integer count) {
Map<Integer,Command> commandMap = new HashMap<>();
for (int i = 0; i < count ;i++){
for (int i = 0;i < count;i++) {
Command command = createCommand();
commandMap.put(command.getId(),command);
}
return commandMap;
}
/**
* create command
* @return
*/
private Command createCommand(){
private Command createCommand() {
return createCommand(CommandType.START_PROCESS,1);
}
@ -238,11 +243,11 @@ public class CommandMapperTest {
* create command
* @return Command
*/
private Command createCommand(CommandType commandType,Integer processDefinitionId){
private Command createCommand(CommandType commandType, long processDefinitionCode) {
Command command = new Command();
command.setCommandType(commandType);
command.setProcessDefinitionId(processDefinitionId);
command.setProcessDefinitionCode(processDefinitionCode);
command.setExecutorId(4);
command.setCommandParam("test command param");
command.setTaskDependType(TaskDependType.TASK_ONLY);
@ -259,6 +264,4 @@ public class CommandMapperTest {
return command;
}
}
}

11
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java

@ -21,6 +21,10 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.dao.entity.CommandCount;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import java.util.Date;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -30,9 +34,6 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
@ -81,7 +82,7 @@ public class ErrorCommandMapperTest {
processDefinition.setCreateTime(new Date());
processDefinitionMapper.insert(processDefinition);
errorCommand.setProcessDefinitionId(processDefinition.getId());
errorCommand.setProcessDefinitionCode(processDefinition.getCode());
errorCommandMapper.updateById(errorCommand);
@ -103,4 +104,4 @@ public class ErrorCommandMapperTest {
Assert.assertNotEquals(commandCounts.size(), 0);
Assert.assertNotEquals(commandCounts2.size(), 0);
}
}
}

22
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java

@ -18,26 +18,16 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
/**
* master test
*/
@ -56,7 +46,7 @@ public class MasterCommandTest {
Command cmd = new Command();
cmd.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
cmd.setCommandParam("{\"ProcessInstanceId\":325}");
cmd.setProcessDefinitionId(63);
cmd.setProcessDefinitionCode(63);
commandMapper.insert(cmd);
@ -66,7 +56,7 @@ public class MasterCommandTest {
public void RecoverSuspendCommand(){
Command cmd = new Command();
cmd.setProcessDefinitionId(44);
cmd.setProcessDefinitionCode(44);
cmd.setCommandParam("{\"ProcessInstanceId\":290}");
cmd.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
@ -80,7 +70,7 @@ public class MasterCommandTest {
public void startNewProcessCommand(){
Command cmd = new Command();
cmd.setCommandType(CommandType.START_PROCESS);
cmd.setProcessDefinitionId(167);
cmd.setProcessDefinitionCode(167);
cmd.setFailureStrategy(FailureStrategy.CONTINUE);
cmd.setWarningType(WarningType.NONE);
cmd.setWarningGroupId(4);
@ -94,7 +84,7 @@ public class MasterCommandTest {
Command cmd = new Command();
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
cmd.setCommandParam("{\"ProcessInstanceId\":816}");
cmd.setProcessDefinitionId(15);
cmd.setProcessDefinitionCode(15);
commandMapper.insert(cmd);
}
@ -105,7 +95,7 @@ public class MasterCommandTest {
cmd.setCommandType(CommandType.START_PROCESS);
cmd.setFailureStrategy(FailureStrategy.CONTINUE);
cmd.setWarningType(WarningType.ALL);
cmd.setProcessDefinitionId(72);
cmd.setProcessDefinitionCode(72);
cmd.setExecutorId(10);
commandMapper.insert(cmd);
}

72
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -270,7 +270,7 @@ public class ProcessService {
* @return if thread is enough
*/
private boolean checkThreadNum(Command command, int validThreadNum) {
int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId());
int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionCode());
return validThreadNum >= commandThreadCount;
}
@ -469,12 +469,14 @@ public class ProcessService {
/**
* calculate sub process number in the process define.
*
* @param processDefinitionId processDefinitionId
* @param processDefinitionCode processDefinitionCode
* @return process thread num count
*/
private Integer workProcessThreadNumCount(Integer processDefinitionId) {
private Integer workProcessThreadNumCount(long processDefinitionCode) {
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode);
List<Integer> ids = new ArrayList<>();
recurseFindSubProcessId(processDefinitionId, ids);
recurseFindSubProcessId(processDefinition.getId(), ids);
return ids.size() + 1;
}
@ -497,7 +499,6 @@ public class ProcessService {
ids.add(subProcessParam.getProcessDefinitionId());
recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(), ids);
}
}
}
}
@ -529,7 +530,7 @@ public class ProcessService {
processInstance.getTaskDependType(),
processInstance.getFailureStrategy(),
processInstance.getExecutorId(),
processInstance.getProcessDefinition().getId(),
processInstance.getProcessDefinition().getCode(),
JSONUtils.toJsonString(cmdParam),
processInstance.getWarningType(),
processInstance.getWarningGroupId(),
@ -713,13 +714,10 @@ public class ProcessService {
CommandType commandType = command.getCommandType();
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
ProcessDefinition processDefinition = null;
if (command.getProcessDefinitionId() != 0) {
processDefinition = processDefineMapper.selectById(command.getProcessDefinitionId());
if (processDefinition == null) {
logger.error("cannot find the work process define! define id : {}", command.getProcessDefinitionId());
return null;
}
ProcessDefinition processDefinition = getProcessDefinitionByCommand(command.getProcessDefinitionCode(), cmdParam);
if (processDefinition == null) {
logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
return null;
}
if (cmdParam != null) {
@ -741,6 +739,7 @@ public class ProcessService {
String pId = cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD);
processInstanceId = Integer.parseInt(pId);
}
if (processInstanceId == 0) {
processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
} else {
@ -868,6 +867,40 @@ public class ProcessService {
return processInstance;
}
/**
* get process definition by command
* If it is a fault-tolerant command, get the specified version of ProcessDefinition through ProcessInstance
* Otherwise, get the latest version of ProcessDefinition
*
* @param processDefinitionCode
* @param cmdParam
* @return ProcessDefinition
*/
private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map<String, String> cmdParam) {
if (cmdParam != null) {
int processInstanceId = 0;
if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING));
} else if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_SUB_PROCESS));
} else if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD)) {
processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD));
}
if (processInstanceId != 0) {
ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) {
return null;
}
return processDefineLogMapper.queryByDefinitionCodeAndVersion(
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
}
}
return processDefineMapper.queryByCode(processDefinitionCode);
}
/**
* return complement data if the process start with complement data
*
@ -1103,7 +1136,7 @@ public class ProcessService {
childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId());
}
Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionId());
updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode());
initSubInstanceState(childInstance);
createCommand(subProcessCommand);
logger.info("sub process command created: {} ", subProcessCommand);
@ -1152,6 +1185,7 @@ public class ProcessService {
CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
Map<String, String> subProcessParam = JSONUtils.toMap(task.getTaskParams());
int childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID));
ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(childDefineId);
Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
@ -1169,7 +1203,7 @@ public class ProcessService {
TaskDependType.TASK_POST,
parentProcessInstance.getFailureStrategy(),
parentProcessInstance.getExecutorId(),
childDefineId,
processDefinition.getCode(),
processParam,
parentProcessInstance.getWarningType(),
parentProcessInstance.getWarningGroupId(),
@ -1208,12 +1242,12 @@ public class ProcessService {
* update sub process definition
*
* @param parentProcessInstance parentProcessInstance
* @param childDefinitionId childDefinitionId
* @param childDefinitionCode childDefinitionId
*/
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) {
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) {
ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
parentProcessInstance.getProcessDefinitionVersion());
ProcessDefinition childDefinition = this.findProcessDefineById(childDefinitionId);
ProcessDefinition childDefinition = this.findProcessDefinitionByCode(childDefinitionCode);
if (childDefinition != null && fatherDefinition != null) {
childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());
processDefineMapper.updateById(childDefinition);
@ -1693,7 +1727,7 @@ public class ProcessService {
//2 insert into recover command
Command cmd = new Command();
cmd.setProcessDefinitionId(processDefinition.getId());
cmd.setProcessDefinitionCode(processDefinition.getCode());
cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
cmd.setExecutorId(processInstance.getExecutorId());
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java

@ -93,7 +93,7 @@ public class ProcessScheduleJob implements Job {
command.setCommandType(CommandType.SCHEDULER);
command.setExecutorId(schedule.getUserId());
command.setFailureStrategy(schedule.getFailureStrategy());
//command.setProcessDefinitionId(schedule.getProcessDefinitionCode()); TODO next pr
command.setProcessDefinitionCode(schedule.getProcessDefinitionCode());
command.setScheduleTime(scheduledFireTime);
command.setStartTime(fireTime);
command.setWarningGroupId(schedule.getWarningGroupId());

60
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -126,6 +126,9 @@ public class ProcessServiceTest {
//father history: start; child null == command type: start
parentInstance.setHistoryCmd("START_PROCESS");
parentInstance.setCommandType(CommandType.START_PROCESS);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(processDefinition);
command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType());
@ -227,16 +230,15 @@ public class ProcessServiceTest {
String host = "127.0.0.1";
int validThreadNum = 1;
Command command = new Command();
command.setProcessDefinitionId(222);
command.setProcessDefinitionCode(222);
command.setCommandType(CommandType.REPEAT_RUNNING);
command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\""
+ CMD_PARAM_SUB_PROCESS_DEFINE_ID + "\":\"222\"}");
Mockito.when(processDefineMapper.selectById(command.getProcessDefinitionId())).thenReturn(null);
Assert.assertNull(processService.handleCommand(logger, host, validThreadNum, command));
//there is not enough thread for this command
Command command1 = new Command();
command1.setProcessDefinitionId(123);
command1.setProcessDefinitionCode(123);
command1.setCommandParam("{\"ProcessInstanceId\":222}");
command1.setCommandType(CommandType.START_PROCESS);
ProcessDefinition processDefinition = new ProcessDefinition();
@ -254,31 +256,35 @@ public class ProcessServiceTest {
processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(222);
Mockito.when(processDefineMapper.selectById(command1.getProcessDefinitionId())).thenReturn(processDefinition);
processInstance.setProcessDefinitionCode(11L);
processInstance.setProcessDefinitionVersion(1);
Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode())).thenReturn(processDefinition);
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command1));
Command command2 = new Command();
command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
command2.setProcessDefinitionId(123);
command2.setProcessDefinitionCode(123);
command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command2));
Command command3 = new Command();
command3.setProcessDefinitionId(123);
command3.setProcessDefinitionCode(123);
command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command3));
Command command4 = new Command();
command4.setProcessDefinitionId(123);
command4.setProcessDefinitionCode(123);
command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
command4.setCommandType(CommandType.REPEAT_RUNNING);
Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command4));
Command command5 = new Command();
command5.setProcessDefinitionId(123);
command5.setProcessDefinitionCode(123);
HashMap<String, String> startParams = new HashMap<>();
startParams.put("startParam1", "testStartParam1");
HashMap<String, String> commandParams = new HashMap<>();
@ -317,20 +323,36 @@ public class ProcessServiceTest {
@Test
public void testRecurseFindSubProcessId() {
int parentProcessDefineId = 1;
long parentProcessDefineCode = 1L;
int parentProcessDefineVersion = 1;
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(10L);
int parentId = 111;
List<Integer> ids = new ArrayList<>();
ProcessDefinition processDefinition2 = new ProcessDefinition();
processDefinition2.setCode(11L);
Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition);
processDefinition.setCode(parentProcessDefineCode);
processDefinition.setVersion(parentProcessDefineVersion);
Mockito.when(processDefineMapper.selectById(parentProcessDefineId)).thenReturn(processDefinition);
long postTaskCode = 2L;
int postTaskVersion = 2;
List<ProcessTaskRelationLog> relationLogList = new ArrayList<>();
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong()
, Mockito.anyInt()))
.thenReturn(relationLogList);
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setPostTaskCode(postTaskCode);
processTaskRelationLog.setPostTaskVersion(postTaskVersion);
relationLogList.add(processTaskRelationLog);
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(parentProcessDefineCode
, parentProcessDefineVersion)).thenReturn(relationLogList);
processService.recurseFindSubProcessId(parentId, ids);
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
TaskDefinitionLog taskDefinitionLog1 = new TaskDefinitionLog();
taskDefinitionLog1.setTaskParams("{\"processDefinitionId\": 123}");
taskDefinitionLogs.add(taskDefinitionLog1);
Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(Mockito.anySet())).thenReturn(taskDefinitionLogs);
List<Integer> ids = new ArrayList<>();
processService.recurseFindSubProcessId(parentProcessDefineId, ids);
Assert.assertEquals(1, ids.size());
}
@Test
@ -499,7 +521,7 @@ public class ProcessServiceTest {
@Test
public void testCreateCommand() {
Command command = new Command();
command.setProcessDefinitionId(123);
command.setProcessDefinitionCode(123);
command.setCommandParam("{\"ProcessInstanceId\":222}");
command.setCommandType(CommandType.START_PROCESS);
int mockResult = 1;

4
sql/dolphinscheduler_mysql.sql

@ -319,7 +319,7 @@ DROP TABLE IF EXISTS `t_ds_command`;
CREATE TABLE `t_ds_command` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
`command_type` tinyint(4) DEFAULT NULL COMMENT 'Command type: 0 start workflow, 1 start execution from current node, 2 resume fault-tolerant workflow, 3 resume pause process, 4 start execution from failed node, 5 complement, 6 schedule, 7 rerun, 8 pause, 9 stop, 10 resume waiting thread',
`process_definition_id` int(11) DEFAULT NULL COMMENT 'process definition id',
`process_definition_code` bigint(20) DEFAULT NULL COMMENT 'process definition code',
`command_param` text COMMENT 'json command parameters',
`task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'Node dependency type: 0 current node, 1 forward, 2 backward',
`failure_strategy` tinyint(4) DEFAULT '0' COMMENT 'Failed policy: 0 end, 1 continue',
@ -367,7 +367,7 @@ CREATE TABLE `t_ds_error_command` (
`id` int(11) NOT NULL COMMENT 'key',
`command_type` tinyint(4) DEFAULT NULL COMMENT 'command type',
`executor_id` int(11) DEFAULT NULL COMMENT 'executor id',
`process_definition_id` int(11) DEFAULT NULL COMMENT 'process definition id',
`process_definition_code` bigint(20) DEFAULT NULL COMMENT 'process definition code',
`command_param` text COMMENT 'json command parameters',
`task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'task depend type',
`failure_strategy` tinyint(4) DEFAULT '0' COMMENT 'failure strategy',

4
sql/dolphinscheduler_postgre.sql

@ -220,7 +220,7 @@ DROP TABLE IF EXISTS t_ds_command;
CREATE TABLE t_ds_command (
id int NOT NULL ,
command_type int DEFAULT NULL ,
process_definition_id int DEFAULT NULL ,
process_definition_code bigint NOT NULL ,
command_param text ,
task_depend_type int DEFAULT NULL ,
failure_strategy int DEFAULT '0' ,
@ -262,7 +262,7 @@ CREATE TABLE t_ds_error_command (
id int NOT NULL ,
command_type int DEFAULT NULL ,
executor_id int DEFAULT NULL ,
process_definition_id int DEFAULT NULL ,
process_definition_code bigint NOT NULL ,
command_param text ,
task_depend_type int DEFAULT NULL ,
failure_strategy int DEFAULT '0' ,

Loading…
Cancel
Save