Browse Source

[Fix-11007] [Master] fix forced_success bug (#11088)

* fix forced_success bug

* add comments

* add transactional

* refactor code

Co-authored-by: JinyLeeChina <jiny.li@foxmail.com>
3.1.0-release
JinYong Li 2 years ago committed by GitHub
parent
commit
e5cca0e79b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  2. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  3. 38
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
@ -46,13 +49,11 @@ import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;
/**
* task instance service impl
*/
@ -166,6 +167,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
* @param taskInstanceId task instance id
* @return the result code and msg
*/
@Transactional
@Override
public Map<String, Object> forceTaskSuccess(User loginUser, long projectCode, Integer taskInstanceId) {
Project project = projectMapper.queryByCode(projectCode);
@ -198,6 +200,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
task.setState(ExecutionStatus.FORCED_SUCCESS);
int changedNum = taskInstanceMapper.updateById(task);
if (changedNum > 0) {
processService.forceProcessInstanceSuccessByTaskInstanceId(taskInstanceId);
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -299,4 +299,6 @@ public interface ProcessService {
ProcessInstance loadNextProcess4Serial(long code, int state, int id);
public String findConfigYamlByName(String clusterName) ;
void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId);
}

38
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -1337,10 +1337,10 @@ public class ProcessServiceImpl implements ProcessService {
*
* @param parentInstance parentInstance
* @param parentTask parentTask
* @param processMap processMap
* @return process instance map
*/
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask, ProcessInstanceMap processMap) {
if (processMap != null) {
return processMap;
}
@ -1404,11 +1404,16 @@ public class ProcessServiceImpl implements ProcessService {
// recover failover tolerance would not create a new command when the sub command already have been created
return;
}
instanceMap = setProcessInstanceMap(parentProcessInstance, task);
instanceMap = setProcessInstanceMap(parentProcessInstance, task, instanceMap);
ProcessInstance childInstance = null;
if (instanceMap.getProcessInstanceId() != 0) {
childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId());
}
if (childInstance != null && childInstance.getState() == ExecutionStatus.SUCCESS
&& CommandType.START_FAILURE_TASK_PROCESS == parentProcessInstance.getCommandType()) {
logger.info("sub process instance {} status is success, so skip creating command", childInstance.getId());
return;
}
Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode());
initSubInstanceState(childInstance);
@ -3106,4 +3111,31 @@ public class ProcessServiceImpl implements ProcessService {
K8s k8s = k8sMapper.selectOne(nodeWrapper);
return k8s.getK8sConfig();
}
@Override
public void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId) {
TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
if (task == null) {
return;
}
ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId());
if (processInstance != null && (processInstance.getState().typeIsFailure() || processInstance.getState().typeIsCancel())) {
List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());
List<Long> instanceTaskCodeList = validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList());
List<ProcessTaskRelation> taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
List<TaskDefinitionLog> taskDefinitionLogs = genTaskDefineList(taskRelations);
List<Long> definiteTaskCodeList = taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES)
.map(TaskDefinitionLog::getCode).collect(Collectors.toList());
// only all tasks have instances
if (org.apache.dolphinscheduler.common.utils.CollectionUtils.equalLists(instanceTaskCodeList, definiteTaskCodeList)) {
List<Integer> failTaskList = validTaskList.stream().filter(instance -> instance.getState().typeIsFailure() || instance.getState().typeIsCancel())
.map(TaskInstance::getId).collect(Collectors.toList());
if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) {
processInstance.setState(ExecutionStatus.SUCCESS);
updateProcessInstance(processInstance);
}
}
}
}
}

Loading…
Cancel
Save