diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index bce8753781..5c43256623 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -207,8 +207,9 @@ public class MasterSchedulerService extends BaseDaemonThread { for (final Command command : commands) { masterPrepareExecService.execute(() -> { try { - // todo: this check is not safe, the slot may change after command transform. - // slot check again + // Note: this check is not safe, the slot may change after command transform. + // We use the database transaction in `handleCommand` so that we can guarantee the command will always be executed + // by only one master SlotCheckState slotCheckState = slotCheck(command); if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) { logger.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 495cb82026..97d1266830 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -138,6 +138,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.commons.collections.CollectionUtils; +import java.sql.SQLIntegrityConstraintViolationException; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -373,7 +374,8 @@ public class ProcessServiceImpl implements ProcessService { } /** - * save error command, and delete original command + * Save error command, and delete original command. If the given command has already been moved into error command, + * will throw {@link SQLIntegrityConstraintViolationException ). * * @param command command * @param message message