From 92b92b5f5a6d5202f031c781001a14725fb6219e Mon Sep 17 00:00:00 2001 From: wen-hemin <39549317+wen-hemin@users.noreply.github.com> Date: Tue, 3 Aug 2021 11:54:39 +0800 Subject: [PATCH] [Fix-5519] The t_ds_command table and t_ds_error_command table, process_definition_id change to process_definition_code (#5937) * fix: the t_ds_schedules table, process_definition_id change to process_definition_code * fix checkstyle * fix: recovery code * fix UT * fix: The t_ds_command table and t_ds_error_command table, process_definition_id change to process_definition_code * fix comment * fix checkstyle * fix: remove duplacated lines * fix: remove TODO Co-authored-by: wen-hemin --- .../api/service/impl/ExecutorServiceImpl.java | 26 ++--- .../dolphinscheduler/dao/entity/Command.java | 71 ++++++------- .../dao/entity/ErrorCommand.java | 99 +++++++------------ .../dao/mapper/CommandMapper.xml | 6 +- .../dao/mapper/ErrorCommandMapper.xml | 4 +- .../dao/mapper/CommandMapperTest.java | 73 +++++++------- .../dao/mapper/ErrorCommandMapperTest.java | 11 ++- .../server/master/MasterCommandTest.java | 22 ++--- .../service/process/ProcessService.java | 72 ++++++++++---- .../service/quartz/ProcessScheduleJob.java | 2 +- .../service/process/ProcessServiceTest.java | 60 +++++++---- sql/dolphinscheduler_mysql.sql | 4 +- sql/dolphinscheduler_postgre.sql | 4 +- 13 files changed, 243 insertions(+), 211 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 89782578cf..c3b422f65b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -160,7 +160,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * create command */ - int create = this.createCommand(commandType, processDefinition.getId(), + int create = this.createCommand(commandType, processDefinition.getCode(), taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, workerGroup, startParams); @@ -276,13 +276,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ switch (executeType) { case REPEAT_RUNNING: - result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.REPEAT_RUNNING, startParams); break; case RECOVER_SUSPENDED_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); break; case START_FAILURE_TASK_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.START_FAILURE_TASK_PROCESS, startParams); break; case STOP: if (processInstance.getState() == ExecutionStatus.READY_STOP) { @@ -394,11 +394,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * * @param loginUser login user * @param instanceId instance id - * @param processDefinitionId process definition id + * @param processDefinitionCode process definition code * @param commandType command type * @return insert result code */ - private Map insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType, String startParams) { + private Map insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, CommandType commandType, String startParams) { Map result = new HashMap<>(); //To add startParams only when repeat running is needed @@ -410,12 +410,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ Command command = new Command(); command.setCommandType(commandType); - command.setProcessDefinitionId(processDefinitionId); + command.setProcessDefinitionCode(processDefinitionCode); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setExecutorId(loginUser.getId()); if (!processService.verifyIsNeedCreateCommand(command)) { - putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionId); + putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionCode); return result; } @@ -475,7 +475,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * create command * * @param commandType commandType - * @param processDefineId processDefineId + * @param processDefineCode processDefineCode * @param nodeDep nodeDep * @param failureStrategy failureStrategy * @param startNodeList startNodeList @@ -488,7 +488,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param workerGroup workerGroup * @return command id */ - private int createCommand(CommandType commandType, int processDefineId, + private int createCommand(CommandType commandType, long processDefineCode, TaskDependType nodeDep, FailureStrategy failureStrategy, String startNodeList, String schedule, WarningType warningType, int executorId, int warningGroupId, @@ -506,7 +506,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } else { command.setCommandType(commandType); } - command.setProcessDefinitionId(processDefineId); + command.setProcessDefinitionCode(processDefineCode); if (nodeDep != null) { command.setTaskDependType(nodeDep); } @@ -549,7 +549,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setCommandParam(JSONUtils.toJsonString(cmdParam)); return processService.createCommand(command); } else if (runMode == RunMode.RUN_MODE_PARALLEL) { - List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineId); // TODO: next pr change to code + List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineCode); List listDate = new LinkedList<>(); if (!CollectionUtils.isEmpty(schedules)) { for (Schedule item : schedules) { @@ -580,7 +580,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } } } else { - logger.error("there is not valid schedule date for the process definition: id:{}", processDefineId); + logger.error("there is not valid schedule date for the process definition code:{}", processDefineCode); } } else { command.setCommandParam(JSONUtils.toJsonString(cmdParam)); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java index cba0151828..bef466e7e8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java @@ -14,15 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.entity; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; + +import java.util.Date; + import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; -import org.apache.dolphinscheduler.common.enums.*; - -import java.util.Date; /** * command @@ -33,7 +39,7 @@ public class Command { /** * id */ - @TableId(value="id", type=IdType.AUTO) + @TableId(value = "id", type = IdType.AUTO) private int id; /** @@ -43,10 +49,10 @@ public class Command { private CommandType commandType; /** - * process definition id + * process definition code */ - @TableField("process_definition_id") - private int processDefinitionId; + @TableField("process_definition_code") + private long processDefinitionCode; /** * executor id @@ -126,7 +132,7 @@ public class Command { TaskDependType taskDependType, FailureStrategy failureStrategy, int executorId, - int processDefinitionId, + long processDefinitionCode, String commandParam, WarningType warningType, int warningGroupId, @@ -135,7 +141,7 @@ public class Command { Priority processInstancePriority) { this.commandType = commandType; this.executorId = executorId; - this.processDefinitionId = processDefinitionId; + this.processDefinitionCode = processDefinitionCode; this.commandParam = commandParam; this.warningType = warningType; this.warningGroupId = warningGroupId; @@ -148,7 +154,6 @@ public class Command { this.processInstancePriority = processInstancePriority; } - public TaskDependType getTaskDependType() { return taskDependType; } @@ -173,15 +178,14 @@ public class Command { this.commandType = commandType; } - public int getProcessDefinitionId() { - return processDefinitionId; + public long getProcessDefinitionCode() { + return processDefinitionCode; } - public void setProcessDefinitionId(int processDefinitionId) { - this.processDefinitionId = processDefinitionId; + public void setProcessDefinitionCode(long processDefinitionCode) { + this.processDefinitionCode = processDefinitionCode; } - public FailureStrategy getFailureStrategy() { return failureStrategy; } @@ -276,7 +280,7 @@ public class Command { if (id != command.id) { return false; } - if (processDefinitionId != command.processDefinitionId) { + if (processDefinitionCode != command.processDefinitionCode) { return false; } if (executorId != command.executorId) { @@ -320,7 +324,7 @@ public class Command { public int hashCode() { int result = id; result = 31 * result + (commandType != null ? commandType.hashCode() : 0); - result = 31 * result + processDefinitionId; + result = 31 * result + Long.hashCode(processDefinitionCode); result = 31 * result + executorId; result = 31 * result + (commandParam != null ? commandParam.hashCode() : 0); result = 31 * result + (taskDependType != null ? taskDependType.hashCode() : 0); @@ -334,24 +338,25 @@ public class Command { result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0); return result; } + @Override public String toString() { - return "Command{" + - "id=" + id + - ", commandType=" + commandType + - ", processDefinitionId=" + processDefinitionId + - ", executorId=" + executorId + - ", commandParam='" + commandParam + '\'' + - ", taskDependType=" + taskDependType + - ", failureStrategy=" + failureStrategy + - ", warningType=" + warningType + - ", warningGroupId=" + warningGroupId + - ", scheduleTime=" + scheduleTime + - ", startTime=" + startTime + - ", processInstancePriority=" + processInstancePriority + - ", updateTime=" + updateTime + - ", workerGroup='" + workerGroup + '\'' + - '}'; + return "Command{" + + "id=" + id + + ", commandType=" + commandType + + ", processDefinitionCode=" + processDefinitionCode + + ", executorId=" + executorId + + ", commandParam='" + commandParam + '\'' + + ", taskDependType=" + taskDependType + + ", failureStrategy=" + failureStrategy + + ", warningType=" + warningType + + ", warningGroupId=" + warningGroupId + + ", scheduleTime=" + scheduleTime + + ", startTime=" + startTime + + ", processInstancePriority=" + processInstancePriority + + ", updateTime=" + updateTime + + ", workerGroup='" + workerGroup + '\'' + + '}'; } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java index 760bb23d90..c4a5e6070a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java @@ -14,15 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.entity; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; + +import java.util.Date; + import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.fasterxml.jackson.annotation.JsonFormat; -import org.apache.dolphinscheduler.common.enums.*; - -import java.util.Date; /** * command @@ -33,7 +39,7 @@ public class ErrorCommand { /** * id */ - @TableId(value="id", type = IdType.INPUT) + @TableId(value = "id", type = IdType.INPUT) private int id; /** @@ -42,9 +48,9 @@ public class ErrorCommand { private CommandType commandType; /** - * process definition id + * process definition code */ - private int processDefinitionId; + private long processDefinitionCode; /** * executor id @@ -79,13 +85,13 @@ public class ErrorCommand { /** * schedule time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") private Date scheduleTime; /** * start time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") private Date startTime; /** @@ -96,7 +102,7 @@ public class ErrorCommand { /** * update time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") private Date updateTime; /** @@ -111,11 +117,11 @@ public class ErrorCommand { public ErrorCommand(){} - public ErrorCommand(Command command, String message){ + public ErrorCommand(Command command, String message) { this.id = command.getId(); this.commandType = command.getCommandType(); this.executorId = command.getExecutorId(); - this.processDefinitionId = command.getProcessDefinitionId(); + this.processDefinitionCode = command.getProcessDefinitionCode(); this.commandParam = command.getCommandParam(); this.warningType = command.getWarningType(); this.warningGroupId = command.getWarningGroupId(); @@ -128,34 +134,6 @@ public class ErrorCommand { this.message = message; } - public ErrorCommand( - CommandType commandType, - TaskDependType taskDependType, - FailureStrategy failureStrategy, - int executorId, - int processDefinitionId, - String commandParam, - WarningType warningType, - int warningGroupId, - Date scheduleTime, - Priority processInstancePriority, - String message){ - this.commandType = commandType; - this.executorId = executorId; - this.processDefinitionId = processDefinitionId; - this.commandParam = commandParam; - this.warningType = warningType; - this.warningGroupId = warningGroupId; - this.scheduleTime = scheduleTime; - this.taskDependType = taskDependType; - this.failureStrategy = failureStrategy; - this.startTime = new Date(); - this.updateTime = new Date(); - this.processInstancePriority = processInstancePriority; - this.message = message; - } - - public TaskDependType getTaskDependType() { return taskDependType; } @@ -180,15 +158,14 @@ public class ErrorCommand { this.commandType = commandType; } - public int getProcessDefinitionId() { - return processDefinitionId; + public long getProcessDefinitionCode() { + return processDefinitionCode; } - public void setProcessDefinitionId(int processDefinitionId) { - this.processDefinitionId = processDefinitionId; + public void setProcessDefinitionCode(long processDefinitionCode) { + this.processDefinitionCode = processDefinitionCode; } - public FailureStrategy getFailureStrategy() { return failureStrategy; } @@ -279,22 +256,22 @@ public class ErrorCommand { @Override public String toString() { - return "ErrorCommand{" + - "id=" + id + - ", commandType=" + commandType + - ", processDefinitionId=" + processDefinitionId + - ", executorId=" + executorId + - ", commandParam='" + commandParam + '\'' + - ", taskDependType=" + taskDependType + - ", failureStrategy=" + failureStrategy + - ", warningType=" + warningType + - ", warningGroupId=" + warningGroupId + - ", scheduleTime=" + scheduleTime + - ", startTime=" + startTime + - ", processInstancePriority=" + processInstancePriority + - ", updateTime=" + updateTime + - ", message='" + message + '\'' + - ", workerGroup='" + workerGroup + '\'' + - '}'; + return "ErrorCommand{" + + "id=" + id + + ", commandType=" + commandType + + ", processDefinitionCode=" + processDefinitionCode + + ", executorId=" + executorId + + ", commandParam='" + commandParam + '\'' + + ", taskDependType=" + taskDependType + + ", failureStrategy=" + failureStrategy + + ", warningType=" + warningType + + ", warningGroupId=" + warningGroupId + + ", scheduleTime=" + scheduleTime + + ", startTime=" + startTime + + ", processInstancePriority=" + processInstancePriority + + ", updateTime=" + updateTime + + ", message='" + message + '\'' + + ", workerGroup='" + workerGroup + '\'' + + '}'; } } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml index c0728f2e43..38f5bffe06 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml @@ -19,11 +19,11 @@ select cmd.command_type as command_type, count(1) as count from t_ds_command cmd, t_ds_process_definition process - where cmd.process_definition_id = process.id + where cmd.process_definition_code = process.code and process.project_code in diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml index 5f93854a3d..8179ff44d1 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml @@ -21,7 +21,7 @@ - \ No newline at end of file + diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java index dd73bc36f6..dc9dbeebe7 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java @@ -14,15 +14,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.mapper; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; + import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.CommandCount; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.common.enums.*; -import org.junit.Assert; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -31,14 +50,6 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; - /** * command mapper test */ @@ -48,24 +59,21 @@ import static org.junit.Assert.*; @Rollback(true) public class CommandMapperTest { - @Autowired CommandMapper commandMapper; @Autowired ProcessDefinitionMapper processDefinitionMapper; - /** * test insert */ @Test - public void testInsert(){ + public void testInsert() { Command command = createCommand(); assertThat(command.getId(),greaterThan(0)); } - /** * test select by id */ @@ -76,14 +84,14 @@ public class CommandMapperTest { Command actualCommand = commandMapper.selectById(expectedCommand.getId()); assertNotNull(actualCommand); - assertEquals(expectedCommand.getProcessDefinitionId(), actualCommand.getProcessDefinitionId()); + assertEquals(expectedCommand.getProcessDefinitionCode(), actualCommand.getProcessDefinitionCode()); } /** * test update */ @Test - public void testUpdate(){ + public void testUpdate() { Command expectedCommand = createCommand(); @@ -103,7 +111,7 @@ public class CommandMapperTest { * test delete */ @Test - public void testDelete(){ + public void testDelete() { Command expectedCommand = createCommand(); commandMapper.deleteById(expectedCommand.getId()); @@ -124,7 +132,6 @@ public class CommandMapperTest { Map commandMap = createCommandMap(count); - List actualCommands = commandMapper.selectList(null); assertThat(actualCommands.size(), greaterThanOrEqualTo(count)); @@ -138,7 +145,7 @@ public class CommandMapperTest { ProcessDefinition processDefinition = createProcessDefinition(); - Command expectedCommand = createCommand(CommandType.START_PROCESS,processDefinition.getId()); + createCommand(CommandType.START_PROCESS, processDefinition.getCode()); Command actualCommand = commandMapper.getOneToRun(); @@ -154,7 +161,7 @@ public class CommandMapperTest { ProcessDefinition processDefinition = createProcessDefinition(); - CommandCount expectedCommandCount = createCommandMap(count, CommandType.START_PROCESS, processDefinition.getId()); + createCommandMap(count, CommandType.START_PROCESS, processDefinition.getCode()); Long[] projectCodeArray = {processDefinition.getProjectCode()}; @@ -167,23 +174,22 @@ public class CommandMapperTest { assertThat(actualCommandCounts.size(),greaterThanOrEqualTo(1)); } - /** * create command map * @param count map count * @param commandType comman type - * @param processDefinitionId process definition id + * @param processDefinitionCode process definition code * @return command map */ private CommandCount createCommandMap( Integer count, CommandType commandType, - Integer processDefinitionId){ + long processDefinitionCode) { CommandCount commandCount = new CommandCount(); - for (int i = 0 ;i < count ;i++){ - createCommand(commandType,processDefinitionId); + for (int i = 0;i < count;i++) { + createCommand(commandType, processDefinitionCode); } commandCount.setCommandType(commandType); commandCount.setCount(count); @@ -195,7 +201,7 @@ public class CommandMapperTest { * create process definition * @return process definition */ - private ProcessDefinition createProcessDefinition(){ + private ProcessDefinition createProcessDefinition() { ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setCode(1L); processDefinition.setReleaseState(ReleaseState.ONLINE); @@ -215,22 +221,21 @@ public class CommandMapperTest { * @param count map count * @return command map */ - private Map createCommandMap(Integer count){ + private Map createCommandMap(Integer count) { Map commandMap = new HashMap<>(); - for (int i = 0; i < count ;i++){ + for (int i = 0;i < count;i++) { Command command = createCommand(); commandMap.put(command.getId(),command); } return commandMap; } - /** * create command * @return */ - private Command createCommand(){ + private Command createCommand() { return createCommand(CommandType.START_PROCESS,1); } @@ -238,11 +243,11 @@ public class CommandMapperTest { * create command * @return Command */ - private Command createCommand(CommandType commandType,Integer processDefinitionId){ + private Command createCommand(CommandType commandType, long processDefinitionCode) { Command command = new Command(); command.setCommandType(commandType); - command.setProcessDefinitionId(processDefinitionId); + command.setProcessDefinitionCode(processDefinitionCode); command.setExecutorId(4); command.setCommandParam("test command param"); command.setTaskDependType(TaskDependType.TASK_ONLY); @@ -259,6 +264,4 @@ public class CommandMapperTest { return command; } - - -} \ No newline at end of file +} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java index 6e18aa97d2..58bd93a915 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java @@ -21,6 +21,10 @@ import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.dao.entity.CommandCount; import org.apache.dolphinscheduler.dao.entity.ErrorCommand; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; + +import java.util.Date; +import java.util.List; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -30,9 +34,6 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; -import java.util.List; - @RunWith(SpringRunner.class) @SpringBootTest @Transactional @@ -81,7 +82,7 @@ public class ErrorCommandMapperTest { processDefinition.setCreateTime(new Date()); processDefinitionMapper.insert(processDefinition); - errorCommand.setProcessDefinitionId(processDefinition.getId()); + errorCommand.setProcessDefinitionCode(processDefinition.getCode()); errorCommandMapper.updateById(errorCommand); @@ -103,4 +104,4 @@ public class ErrorCommandMapperTest { Assert.assertNotEquals(commandCounts.size(), 0); Assert.assertNotEquals(commandCounts2.size(), 0); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java index d402afcee2..2093d95bbd 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java @@ -18,26 +18,16 @@ package org.apache.dolphinscheduler.server.master; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; -import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.WarningType; -import org.apache.dolphinscheduler.common.graph.DAG; -import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.model.TaskNodeRelation; -import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.mapper.CommandMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.utils.DagHelper; + import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; - /** * master test */ @@ -56,7 +46,7 @@ public class MasterCommandTest { Command cmd = new Command(); cmd.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); cmd.setCommandParam("{\"ProcessInstanceId\":325}"); - cmd.setProcessDefinitionId(63); + cmd.setProcessDefinitionCode(63); commandMapper.insert(cmd); @@ -66,7 +56,7 @@ public class MasterCommandTest { public void RecoverSuspendCommand(){ Command cmd = new Command(); - cmd.setProcessDefinitionId(44); + cmd.setProcessDefinitionCode(44); cmd.setCommandParam("{\"ProcessInstanceId\":290}"); cmd.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS); @@ -80,7 +70,7 @@ public class MasterCommandTest { public void startNewProcessCommand(){ Command cmd = new Command(); cmd.setCommandType(CommandType.START_PROCESS); - cmd.setProcessDefinitionId(167); + cmd.setProcessDefinitionCode(167); cmd.setFailureStrategy(FailureStrategy.CONTINUE); cmd.setWarningType(WarningType.NONE); cmd.setWarningGroupId(4); @@ -94,7 +84,7 @@ public class MasterCommandTest { Command cmd = new Command(); cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS); cmd.setCommandParam("{\"ProcessInstanceId\":816}"); - cmd.setProcessDefinitionId(15); + cmd.setProcessDefinitionCode(15); commandMapper.insert(cmd); } @@ -105,7 +95,7 @@ public class MasterCommandTest { cmd.setCommandType(CommandType.START_PROCESS); cmd.setFailureStrategy(FailureStrategy.CONTINUE); cmd.setWarningType(WarningType.ALL); - cmd.setProcessDefinitionId(72); + cmd.setProcessDefinitionCode(72); cmd.setExecutorId(10); commandMapper.insert(cmd); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 01e4678b6e..bd5e4df5c5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -270,7 +270,7 @@ public class ProcessService { * @return if thread is enough */ private boolean checkThreadNum(Command command, int validThreadNum) { - int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); + int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionCode()); return validThreadNum >= commandThreadCount; } @@ -469,12 +469,14 @@ public class ProcessService { /** * calculate sub process number in the process define. * - * @param processDefinitionId processDefinitionId + * @param processDefinitionCode processDefinitionCode * @return process thread num count */ - private Integer workProcessThreadNumCount(Integer processDefinitionId) { + private Integer workProcessThreadNumCount(long processDefinitionCode) { + ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode); + List ids = new ArrayList<>(); - recurseFindSubProcessId(processDefinitionId, ids); + recurseFindSubProcessId(processDefinition.getId(), ids); return ids.size() + 1; } @@ -497,7 +499,6 @@ public class ProcessService { ids.add(subProcessParam.getProcessDefinitionId()); recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(), ids); } - } } } @@ -529,7 +530,7 @@ public class ProcessService { processInstance.getTaskDependType(), processInstance.getFailureStrategy(), processInstance.getExecutorId(), - processInstance.getProcessDefinition().getId(), + processInstance.getProcessDefinition().getCode(), JSONUtils.toJsonString(cmdParam), processInstance.getWarningType(), processInstance.getWarningGroupId(), @@ -713,13 +714,10 @@ public class ProcessService { CommandType commandType = command.getCommandType(); Map cmdParam = JSONUtils.toMap(command.getCommandParam()); - ProcessDefinition processDefinition = null; - if (command.getProcessDefinitionId() != 0) { - processDefinition = processDefineMapper.selectById(command.getProcessDefinitionId()); - if (processDefinition == null) { - logger.error("cannot find the work process define! define id : {}", command.getProcessDefinitionId()); - return null; - } + ProcessDefinition processDefinition = getProcessDefinitionByCommand(command.getProcessDefinitionCode(), cmdParam); + if (processDefinition == null) { + logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode()); + return null; } if (cmdParam != null) { @@ -741,6 +739,7 @@ public class ProcessService { String pId = cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD); processInstanceId = Integer.parseInt(pId); } + if (processInstanceId == 0) { processInstance = generateNewProcessInstance(processDefinition, command, cmdParam); } else { @@ -868,6 +867,40 @@ public class ProcessService { return processInstance; } + /** + * get process definition by command + * If it is a fault-tolerant command, get the specified version of ProcessDefinition through ProcessInstance + * Otherwise, get the latest version of ProcessDefinition + * + * @param processDefinitionCode + * @param cmdParam + * @return ProcessDefinition + */ + private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map cmdParam) { + if (cmdParam != null) { + int processInstanceId = 0; + if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) { + processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)); + } else if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) { + processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_SUB_PROCESS)); + } else if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD)) { + processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD)); + } + + if (processInstanceId != 0) { + ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId); + if (processInstance == null) { + return null; + } + + return processDefineLogMapper.queryByDefinitionCodeAndVersion( + processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); + } + } + + return processDefineMapper.queryByCode(processDefinitionCode); + } + /** * return complement data if the process start with complement data * @@ -1103,7 +1136,7 @@ public class ProcessService { childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId()); } Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task); - updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionId()); + updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode()); initSubInstanceState(childInstance); createCommand(subProcessCommand); logger.info("sub process command created: {} ", subProcessCommand); @@ -1152,6 +1185,7 @@ public class ProcessService { CommandType commandType = getSubCommandType(parentProcessInstance, childInstance); Map subProcessParam = JSONUtils.toMap(task.getTaskParams()); int childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID)); + ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(childDefineId); Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS); List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); @@ -1169,7 +1203,7 @@ public class ProcessService { TaskDependType.TASK_POST, parentProcessInstance.getFailureStrategy(), parentProcessInstance.getExecutorId(), - childDefineId, + processDefinition.getCode(), processParam, parentProcessInstance.getWarningType(), parentProcessInstance.getWarningGroupId(), @@ -1208,12 +1242,12 @@ public class ProcessService { * update sub process definition * * @param parentProcessInstance parentProcessInstance - * @param childDefinitionId childDefinitionId + * @param childDefinitionCode childDefinitionId */ - private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) { + private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) { ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(), parentProcessInstance.getProcessDefinitionVersion()); - ProcessDefinition childDefinition = this.findProcessDefineById(childDefinitionId); + ProcessDefinition childDefinition = this.findProcessDefinitionByCode(childDefinitionCode); if (childDefinition != null && fatherDefinition != null) { childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId()); processDefineMapper.updateById(childDefinition); @@ -1693,7 +1727,7 @@ public class ProcessService { //2 insert into recover command Command cmd = new Command(); - cmd.setProcessDefinitionId(processDefinition.getId()); + cmd.setProcessDefinitionCode(processDefinition.getCode()); cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId())); cmd.setExecutorId(processInstance.getExecutorId()); cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java index bda8ad899f..eacd8bcf09 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java @@ -93,7 +93,7 @@ public class ProcessScheduleJob implements Job { command.setCommandType(CommandType.SCHEDULER); command.setExecutorId(schedule.getUserId()); command.setFailureStrategy(schedule.getFailureStrategy()); - //command.setProcessDefinitionId(schedule.getProcessDefinitionCode()); TODO next pr + command.setProcessDefinitionCode(schedule.getProcessDefinitionCode()); command.setScheduleTime(scheduledFireTime); command.setStartTime(fireTime); command.setWarningGroupId(schedule.getWarningGroupId()); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 9eeec7896e..413a9d89f3 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -126,6 +126,9 @@ public class ProcessServiceTest { //father history: start; child null == command type: start parentInstance.setHistoryCmd("START_PROCESS"); parentInstance.setCommandType(CommandType.START_PROCESS); + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setCode(1L); + Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(processDefinition); command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType()); @@ -227,16 +230,15 @@ public class ProcessServiceTest { String host = "127.0.0.1"; int validThreadNum = 1; Command command = new Command(); - command.setProcessDefinitionId(222); + command.setProcessDefinitionCode(222); command.setCommandType(CommandType.REPEAT_RUNNING); command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\"" + CMD_PARAM_SUB_PROCESS_DEFINE_ID + "\":\"222\"}"); - Mockito.when(processDefineMapper.selectById(command.getProcessDefinitionId())).thenReturn(null); Assert.assertNull(processService.handleCommand(logger, host, validThreadNum, command)); //there is not enough thread for this command Command command1 = new Command(); - command1.setProcessDefinitionId(123); + command1.setProcessDefinitionCode(123); command1.setCommandParam("{\"ProcessInstanceId\":222}"); command1.setCommandType(CommandType.START_PROCESS); ProcessDefinition processDefinition = new ProcessDefinition(); @@ -254,31 +256,35 @@ public class ProcessServiceTest { processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]"); ProcessInstance processInstance = new ProcessInstance(); processInstance.setId(222); - Mockito.when(processDefineMapper.selectById(command1.getProcessDefinitionId())).thenReturn(processDefinition); + processInstance.setProcessDefinitionCode(11L); + processInstance.setProcessDefinitionVersion(1); + Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode())).thenReturn(processDefinition); + Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command1)); Command command2 = new Command(); command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}"); - command2.setProcessDefinitionId(123); + command2.setProcessDefinitionCode(123); command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS); Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command2)); Command command3 = new Command(); - command3.setProcessDefinitionId(123); + command3.setProcessDefinitionCode(123); command3.setCommandParam("{\"WaitingThreadInstanceId\":222}"); command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command3)); Command command4 = new Command(); - command4.setProcessDefinitionId(123); + command4.setProcessDefinitionCode(123); command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}"); command4.setCommandType(CommandType.REPEAT_RUNNING); Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command4)); Command command5 = new Command(); - command5.setProcessDefinitionId(123); + command5.setProcessDefinitionCode(123); HashMap startParams = new HashMap<>(); startParams.put("startParam1", "testStartParam1"); HashMap commandParams = new HashMap<>(); @@ -317,20 +323,36 @@ public class ProcessServiceTest { @Test public void testRecurseFindSubProcessId() { + int parentProcessDefineId = 1; + long parentProcessDefineCode = 1L; + int parentProcessDefineVersion = 1; + ProcessDefinition processDefinition = new ProcessDefinition(); - processDefinition.setCode(10L); - int parentId = 111; - List ids = new ArrayList<>(); - ProcessDefinition processDefinition2 = new ProcessDefinition(); - processDefinition2.setCode(11L); - Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition); + processDefinition.setCode(parentProcessDefineCode); + processDefinition.setVersion(parentProcessDefineVersion); + Mockito.when(processDefineMapper.selectById(parentProcessDefineId)).thenReturn(processDefinition); + + long postTaskCode = 2L; + int postTaskVersion = 2; + List relationLogList = new ArrayList<>(); - Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong() - , Mockito.anyInt())) - .thenReturn(relationLogList); + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.setPostTaskCode(postTaskCode); + processTaskRelationLog.setPostTaskVersion(postTaskVersion); + relationLogList.add(processTaskRelationLog); + Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(parentProcessDefineCode + , parentProcessDefineVersion)).thenReturn(relationLogList); - processService.recurseFindSubProcessId(parentId, ids); + List taskDefinitionLogs = new ArrayList<>(); + TaskDefinitionLog taskDefinitionLog1 = new TaskDefinitionLog(); + taskDefinitionLog1.setTaskParams("{\"processDefinitionId\": 123}"); + taskDefinitionLogs.add(taskDefinitionLog1); + Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(Mockito.anySet())).thenReturn(taskDefinitionLogs); + + List ids = new ArrayList<>(); + processService.recurseFindSubProcessId(parentProcessDefineId, ids); + Assert.assertEquals(1, ids.size()); } @Test @@ -499,7 +521,7 @@ public class ProcessServiceTest { @Test public void testCreateCommand() { Command command = new Command(); - command.setProcessDefinitionId(123); + command.setProcessDefinitionCode(123); command.setCommandParam("{\"ProcessInstanceId\":222}"); command.setCommandType(CommandType.START_PROCESS); int mockResult = 1; diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index 11bdc8dbfa..d056b23667 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -319,7 +319,7 @@ DROP TABLE IF EXISTS `t_ds_command`; CREATE TABLE `t_ds_command` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key', `command_type` tinyint(4) DEFAULT NULL COMMENT 'Command type: 0 start workflow, 1 start execution from current node, 2 resume fault-tolerant workflow, 3 resume pause process, 4 start execution from failed node, 5 complement, 6 schedule, 7 rerun, 8 pause, 9 stop, 10 resume waiting thread', - `process_definition_id` int(11) DEFAULT NULL COMMENT 'process definition id', + `process_definition_code` bigint(20) DEFAULT NULL COMMENT 'process definition code', `command_param` text COMMENT 'json command parameters', `task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'Node dependency type: 0 current node, 1 forward, 2 backward', `failure_strategy` tinyint(4) DEFAULT '0' COMMENT 'Failed policy: 0 end, 1 continue', @@ -367,7 +367,7 @@ CREATE TABLE `t_ds_error_command` ( `id` int(11) NOT NULL COMMENT 'key', `command_type` tinyint(4) DEFAULT NULL COMMENT 'command type', `executor_id` int(11) DEFAULT NULL COMMENT 'executor id', - `process_definition_id` int(11) DEFAULT NULL COMMENT 'process definition id', + `process_definition_code` bigint(20) DEFAULT NULL COMMENT 'process definition code', `command_param` text COMMENT 'json command parameters', `task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'task depend type', `failure_strategy` tinyint(4) DEFAULT '0' COMMENT 'failure strategy', diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql index eb97912562..f3967817c3 100644 --- a/sql/dolphinscheduler_postgre.sql +++ b/sql/dolphinscheduler_postgre.sql @@ -220,7 +220,7 @@ DROP TABLE IF EXISTS t_ds_command; CREATE TABLE t_ds_command ( id int NOT NULL , command_type int DEFAULT NULL , - process_definition_id int DEFAULT NULL , + process_definition_code bigint NOT NULL , command_param text , task_depend_type int DEFAULT NULL , failure_strategy int DEFAULT '0' , @@ -262,7 +262,7 @@ CREATE TABLE t_ds_error_command ( id int NOT NULL , command_type int DEFAULT NULL , executor_id int DEFAULT NULL , - process_definition_id int DEFAULT NULL , + process_definition_code bigint NOT NULL , command_param text , task_depend_type int DEFAULT NULL , failure_strategy int DEFAULT '0' ,