From db04a5b04df8aa5754741c6dd456579d743af1e9 Mon Sep 17 00:00:00 2001 From: OS <29528966+lenboo@users.noreply.github.com> Date: Mon, 11 Oct 2021 20:22:58 +0800 Subject: [PATCH] [Feature-6471]Cache Process definition in master (#6485) * feature-6471 Cache Process definition in master --- .../api/service/impl/ExecutorServiceImpl.java | 16 ++- .../dolphinscheduler/dao/entity/Command.java | 41 +++++- .../dao/mapper/CommandMapper.java | 7 - .../dao/mapper/CommandMapper.xml | 10 -- .../dao/mapper/CommandMapperTest.java | 4 +- .../server/master/config/MasterConfig.java | 12 ++ .../master/runner/MasterSchedulerService.java | 28 +++- .../src/main/resources/master.properties | 3 + .../service/process/ProcessService.java | 133 ++++++++---------- .../service/quartz/ProcessScheduleJob.java | 1 + .../service/process/ProcessServiceTest.java | 48 ++++--- sql/dolphinscheduler_h2.sql | 4 + sql/dolphinscheduler_mysql.sql | 34 +++-- sql/dolphinscheduler_postgre.sql | 63 +++++---- 14 files changed, 235 insertions(+), 169 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 5042a03714..fe5f9101b1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -283,13 +283,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ switch (executeType) { case REPEAT_RUNNING: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.REPEAT_RUNNING, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams); break; case RECOVER_SUSPENDED_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); break; case START_FAILURE_TASK_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.START_FAILURE_TASK_PROCESS, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams); break; case STOP: if (processInstance.getState() == ExecutionStatus.READY_STOP) { @@ -409,10 +409,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param loginUser login user * @param instanceId instance id * @param processDefinitionCode process definition code + * @param version * @param commandType command type * @return insert result code */ - private Map insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, CommandType commandType, String startParams) { + private Map insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) { Map result = new HashMap<>(); //To add startParams only when repeat running is needed @@ -427,6 +428,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setProcessDefinitionCode(processDefinitionCode); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setExecutorId(loginUser.getId()); + command.setProcessDefinitionVersion(processVersion); + command.setProcessInstanceId(instanceId); if (!processService.verifyIsNeedCreateCommand(command)) { putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionCode); @@ -545,6 +548,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setWorkerGroup(workerGroup); command.setEnvironmentCode(environmentCode); command.setDryRun(dryRun); + ProcessDefinition processDefinition = processService.findProcessDefinitionByCode(processDefineCode); + if (processDefinition != null) { + command.setProcessDefinitionVersion(processDefinition.getVersion()); + } + command.setProcessInstanceId(0); Date start = null; Date end = null; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java index b1ed217537..ae2ff6258a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java @@ -132,6 +132,12 @@ public class Command { @TableField("dry_run") private int dryRun; + @TableField("process_instance_id") + private int processInstanceId; + + @TableField("process_definition_version") + private int processDefinitionVersion; + public Command() { this.taskDependType = TaskDependType.TASK_POST; this.failureStrategy = FailureStrategy.CONTINUE; @@ -152,7 +158,10 @@ public class Command { String workerGroup, Long environmentCode, Priority processInstancePriority, - int dryRun) { + int dryRun, + int processInstanceId, + int processDefinitionVersion + ) { this.commandType = commandType; this.executorId = executorId; this.processDefinitionCode = processDefinitionCode; @@ -168,6 +177,8 @@ public class Command { this.environmentCode = environmentCode; this.processInstancePriority = processInstancePriority; this.dryRun = dryRun; + this.processInstanceId = processInstanceId; + this.processDefinitionVersion = processDefinitionVersion; } public TaskDependType getTaskDependType() { @@ -298,6 +309,22 @@ public class Command { this.dryRun = dryRun; } + public int getProcessInstanceId() { + return processInstanceId; + } + + public void setProcessInstanceId(int processInstanceId) { + this.processInstanceId = processInstanceId; + } + + public int getProcessDefinitionVersion() { + return processDefinitionVersion; + } + + public void setProcessDefinitionVersion(int processDefinitionVersion) { + this.processDefinitionVersion = processDefinitionVersion; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -353,8 +380,13 @@ public class Command { if (processInstancePriority != command.processInstancePriority) { return false; } + if (processInstanceId != command.processInstanceId) { + return false; + } + if (processDefinitionVersion != command.getProcessDefinitionVersion()) { + return false; + } return !(updateTime != null ? !updateTime.equals(command.updateTime) : command.updateTime != null); - } @Override @@ -375,6 +407,8 @@ public class Command { result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0); result = 31 * result + (environmentCode != null ? environmentCode.hashCode() : 0); result = 31 * result + dryRun; + result = 31 * result + processInstanceId; + result = 31 * result + processDefinitionVersion; return result; } @@ -397,7 +431,10 @@ public class Command { + ", workerGroup='" + workerGroup + '\'' + ", environmentCode='" + environmentCode + '\'' + ", dryRun='" + dryRun + '\'' + + ", processInstanceId='" + processInstanceId + '\'' + + ", processDefinitionVersion='" + processDefinitionVersion + '\'' + '}'; } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java index 2bbfb4b7b1..22913845c3 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java @@ -31,13 +31,6 @@ import java.util.List; */ public interface CommandMapper extends BaseMapper { - - /** - * get one command - * @return command - */ - Command getOneToRun(); - /** * count command state * @param userId userId diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml index 5b2d6b4d8c..b0ea477431 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml @@ -18,16 +18,6 @@ -