Browse Source

加入valid检验

pull/3/MERGE
Zhou Zheng 4 years ago
parent
commit
d0e6b09dd1
  1. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 28
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -231,6 +231,7 @@ public enum Status {
EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028,"export process definition by id error", "导出工作流定义错误"),
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"),
IMPORT_PROCESS_DEFINE_ERROR(50029,"import process definition error", "导入工作流定义错误"),
NO_VALID_FORCED_SUCCESS_TASK(50030, "there is no valid forced success node in process instance {0}", "工作流实例[{0}]中不包含有效的强制成功的任务实例"),
HDFS_NOT_STARTUP(60001,"hdfs not startup", "hdfs未启用"),

28
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -266,7 +266,11 @@ public class ExecutorService extends BaseService{
}
break;
case RESUME_FROM_FORCED_SUCCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RESUME_FROM_FORCED_SUCCESS);
if (!this.checkValidForcedSuccessTask(processInstanceId)) {
putMsg(result, Status.NO_VALID_FORCED_SUCCESS_TASK, processInstance.getName());
} else {
result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RESUME_FROM_FORCED_SUCCESS);
}
break;
default:
logger.error("unknown execute type : {}", executeType);
@ -600,4 +604,26 @@ public class ExecutorService extends BaseService{
return null;
}
/**
* check if the process instance contains valid forced success task
*
* @param processInstanceId
* @return
*/
private boolean checkValidForcedSuccessTask(int processInstanceId) {
List<Integer> forcedSuccessList = processService.findTaskIdByInstanceState(processInstanceId, ExecutionStatus.FORCED_SUCCESS);
if (forcedSuccessList != null && forcedSuccessList.size() > 0) {
return true;
}
List<Integer> failedSubList = processService.findTaskIdByInstanceStatusAndType(processInstanceId,
new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE},
TaskType.SUB_PROCESS);
for (int i = 0; i < failedSubList.size(); i++) {
if (processService.haveForcedSuccessInSubProcess(failedSubList.get(i))) {
return true;
}
}
return false;
}
}

Loading…
Cancel
Save