diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java index 91a29eacba..6aa48ea12d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java @@ -31,6 +31,13 @@ public interface ProcessInstanceDao extends IDao { */ void upsertProcessInstance(ProcessInstance processInstance); + /** + * performs an "upsert" operation (update or insert) on a ProcessInstance object within a new transaction + * + * @param processInstance processInstance + */ + void performTransactionalUpsert(ProcessInstance processInstance); + /** * find last scheduler process instance in the date interval * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java index 688556c2da..fca93da29d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java @@ -30,6 +30,9 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; @Slf4j @Repository @@ -53,6 +56,12 @@ public class ProcessInstanceDaoImpl extends BaseDao runningProcessInstances = @@ -349,12 +349,12 @@ public class ProcessServiceImpl implements ProcessService { processInstance.getId()); if (CollectionUtils.isNotEmpty(runningProcessInstances)) { processInstance.setStateWithDesc(WorkflowExecutionStatus.STOP, "stop by serial_discard strategy"); - processInstanceDao.upsertProcessInstance(processInstance); + processInstanceDao.performTransactionalUpsert(processInstance); return; } processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "submit from serial_discard strategy"); - processInstanceDao.upsertProcessInstance(processInstance); + processInstanceDao.performTransactionalUpsert(processInstance); } else if (processDefinition.getExecutionType().typeIsSerialPriority()) { List runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId( @@ -384,7 +384,7 @@ public class ProcessServiceImpl implements ProcessService { } processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "submit by serial_priority strategy"); - processInstanceDao.upsertProcessInstance(processInstance); + processInstanceDao.performTransactionalUpsert(processInstance); } }