Browse Source

[Fix-11007] [Master] fix forced_success bug (#11088)

* fix forced_success bug

* add comments

* add transactional

* refactor code

Co-authored-by: JinyLeeChina <jiny.li@foxmail.com>

(cherry picked from commit e5cca0e79b)
3.0.0/version-upgrade
JinYong Li 2 years ago committed by Jiajie Zhong
parent
commit
68a73b04a2
  1. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  2. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  3. 38
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -46,6 +46,7 @@ import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@ -163,6 +164,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
* @param taskInstanceId task instance id
* @return the result code and msg
*/
@Transactional
@Override
public Map<String, Object> forceTaskSuccess(User loginUser, long projectCode, Integer taskInstanceId) {
Project project = projectMapper.queryByCode(projectCode);
@ -195,6 +197,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
task.setState(ExecutionStatus.FORCED_SUCCESS);
int changedNum = taskInstanceMapper.updateById(task);
if (changedNum > 0) {
processService.forceProcessInstanceSuccessByTaskInstanceId(taskInstanceId);
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -295,4 +295,6 @@ public interface ProcessService {
org.apache.dolphinscheduler.remote.command.CommandType taskType);
ProcessInstance loadNextProcess4Serial(long code, int state, int id);
void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId);
}

38
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -1308,10 +1308,10 @@ public class ProcessServiceImpl implements ProcessService {
*
* @param parentInstance parentInstance
* @param parentTask parentTask
* @param processMap processMap
* @return process instance map
*/
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask, ProcessInstanceMap processMap) {
if (processMap != null) {
return processMap;
}
@ -1375,11 +1375,16 @@ public class ProcessServiceImpl implements ProcessService {
// recover failover tolerance would not create a new command when the sub command already have been created
return;
}
instanceMap = setProcessInstanceMap(parentProcessInstance, task);
instanceMap = setProcessInstanceMap(parentProcessInstance, task, instanceMap);
ProcessInstance childInstance = null;
if (instanceMap.getProcessInstanceId() != 0) {
childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId());
}
if (childInstance != null && childInstance.getState() == ExecutionStatus.SUCCESS
&& CommandType.START_FAILURE_TASK_PROCESS == parentProcessInstance.getCommandType()) {
logger.info("sub process instance {} status is success, so skip creating command", childInstance.getId());
return;
}
Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode());
initSubInstanceState(childInstance);
@ -3050,4 +3055,31 @@ public class ProcessServiceImpl implements ProcessService {
throw new ServiceException("delete command fail, id:" + commandId);
}
}
@Override
public void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId) {
TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
if (task == null) {
return;
}
ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId());
if (processInstance != null && (processInstance.getState().typeIsFailure() || processInstance.getState().typeIsCancel())) {
List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());
List<Long> instanceTaskCodeList = validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList());
List<ProcessTaskRelation> taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
List<TaskDefinitionLog> taskDefinitionLogs = genTaskDefineList(taskRelations);
List<Long> definiteTaskCodeList = taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES)
.map(TaskDefinitionLog::getCode).collect(Collectors.toList());
// only all tasks have instances
if (org.apache.dolphinscheduler.common.utils.CollectionUtils.equalLists(instanceTaskCodeList, definiteTaskCodeList)) {
List<Integer> failTaskList = validTaskList.stream().filter(instance -> instance.getState().typeIsFailure() || instance.getState().typeIsCancel())
.map(TaskInstance::getId).collect(Collectors.toList());
if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) {
processInstance.setState(ExecutionStatus.SUCCESS);
updateProcessInstance(processInstance);
}
}
}
}
}
Loading…
Cancel
Save