From a405abec9e7486f1bcefa7aa56a790caa37fc7dc Mon Sep 17 00:00:00 2001 From: Gallardot Date: Fri, 12 Jan 2024 17:53:24 +0800 Subject: [PATCH] [Bug][Master]serial_wait strategy workflow unable to wake up (#15270) * fix: serial_wait strategy workflow unable to wake up Signed-off-by: Gallardot * fix: serial_wait strategy workflow unable to wake up Signed-off-by: Gallardot --------- Signed-off-by: Gallardot Co-authored-by: fuchanghai --- .../dao/repository/ProcessInstanceDao.java | 7 +++++++ .../dao/repository/impl/ProcessInstanceDaoImpl.java | 9 +++++++++ .../server/master/runner/WorkflowExecuteRunnable.java | 2 +- .../service/process/ProcessServiceImpl.java | 10 +++++----- 4 files changed, 22 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java index 91a29eacba..6aa48ea12d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java @@ -31,6 +31,13 @@ public interface ProcessInstanceDao extends IDao { */ void upsertProcessInstance(ProcessInstance processInstance); + /** + * performs an "upsert" operation (update or insert) on a ProcessInstance object within a new transaction + * + * @param processInstance processInstance + */ + void performTransactionalUpsert(ProcessInstance processInstance); + /** * find last scheduler process instance in the date interval * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java index 688556c2da..fca93da29d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java @@ -30,6 +30,9 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; @Slf4j @Repository @@ -53,6 +56,12 @@ public class ProcessInstanceDaoImpl extends BaseDao runningProcessInstances = @@ -349,12 +349,12 @@ public class ProcessServiceImpl implements ProcessService { processInstance.getId()); if (CollectionUtils.isNotEmpty(runningProcessInstances)) { processInstance.setStateWithDesc(WorkflowExecutionStatus.STOP, "stop by serial_discard strategy"); - processInstanceDao.upsertProcessInstance(processInstance); + processInstanceDao.performTransactionalUpsert(processInstance); return; } processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "submit from serial_discard strategy"); - processInstanceDao.upsertProcessInstance(processInstance); + processInstanceDao.performTransactionalUpsert(processInstance); } else if (processDefinition.getExecutionType().typeIsSerialPriority()) { List runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId( @@ -384,7 +384,7 @@ public class ProcessServiceImpl implements ProcessService { } processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "submit by serial_priority strategy"); - processInstanceDao.upsertProcessInstance(processInstance); + processInstanceDao.performTransactionalUpsert(processInstance); } }