Browse Source

[Bug][Master]serial_wait strategy workflow unable to wake up (#15270)

* fix: serial_wait strategy workflow unable to wake up

Signed-off-by: Gallardot <gallardot@apache.org>

* fix: serial_wait strategy workflow unable to wake up

Signed-off-by: Gallardot <gallardot@apache.org>

---------

Signed-off-by: Gallardot <gallardot@apache.org>
Co-authored-by: fuchanghai <changhaifu@apache.org>
3.2.1-prepare
Gallardot 11 months ago committed by GitHub
parent
commit
a405abec9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
  2. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
  3. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  4. 10
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java

@ -31,6 +31,13 @@ public interface ProcessInstanceDao extends IDao<ProcessInstance> {
*/
void upsertProcessInstance(ProcessInstance processInstance);
/**
* performs an "upsert" operation (update or insert) on a ProcessInstance object within a new transaction
*
* @param processInstance processInstance
*/
void performTransactionalUpsert(ProcessInstance processInstance);
/**
* find last scheduler process instance in the date interval
*

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java

@ -30,6 +30,9 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Repository
@ -53,6 +56,12 @@ public class ProcessInstanceDaoImpl extends BaseDao<ProcessInstance, ProcessInst
}
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
public void performTransactionalUpsert(ProcessInstance processInstance) {
this.upsertProcessInstance(processInstance);
}
/**
* find last scheduler process instance in the date interval
*

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -1842,7 +1842,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
workflowInstance.setEndTime(new Date());
}
try {
processInstanceDao.updateById(workflowInstance);
processInstanceDao.performTransactionalUpsert(workflowInstance);
} catch (Exception ex) {
// recover the status
workflowInstance.setStateWithDesc(originStates, "recover state by DB error");

10
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -325,7 +325,7 @@ public class ProcessServiceImpl implements ProcessService {
protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinition processDefinition) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.SERIAL_WAIT, "wait by serial_wait strategy");
processInstanceDao.upsertProcessInstance(processInstance);
processInstanceDao.performTransactionalUpsert(processInstance);
// serial wait
// when we get the running instance(or waiting instance) only get the priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) {
@ -338,7 +338,7 @@ public class ProcessServiceImpl implements ProcessService {
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit from serial_wait strategy");
processInstanceDao.upsertProcessInstance(processInstance);
processInstanceDao.performTransactionalUpsert(processInstance);
}
} else if (processDefinition.getExecutionType().typeIsSerialDiscard()) {
List<ProcessInstance> runningProcessInstances =
@ -349,12 +349,12 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.getId());
if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.STOP, "stop by serial_discard strategy");
processInstanceDao.upsertProcessInstance(processInstance);
processInstanceDao.performTransactionalUpsert(processInstance);
return;
}
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit from serial_discard strategy");
processInstanceDao.upsertProcessInstance(processInstance);
processInstanceDao.performTransactionalUpsert(processInstance);
} else if (processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(
@ -384,7 +384,7 @@ public class ProcessServiceImpl implements ProcessService {
}
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit by serial_priority strategy");
processInstanceDao.upsertProcessInstance(processInstance);
processInstanceDao.performTransactionalUpsert(processInstance);
}
}

Loading…
Cancel
Save