Browse Source

[Feature-#14148][Task] Added status check interval and dependent failure policy parameters for dependent task nodes (#14150)

* Added a dependent task failure policy in the dependent task node
3.2.0-release
Kerwin 1 year ago committed by GitHub
parent
commit
8315279620
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      docs/docs/en/guide/task/dependent.md
  2. 8
      docs/docs/zh/guide/task/dependent.md
  3. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java
  4. 24
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
  5. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java
  6. 15
      dolphinscheduler-ui/src/locales/en_US/project.ts
  7. 19
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  8. 121
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts
  9. 13
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  10. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dependent.ts
  11. 17
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

9
docs/docs/en/guide/task/dependent.md

@ -16,9 +16,12 @@ Dependent nodes are **dependency check nodes**. For example, process A depends o
- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.
| **Parameter** | **Description** |
|------------------|---------------------------------------------|
| Predecessor Task | The upstream task of the current task node. |
| **Parameter** | **Description** |
|---------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Add Dependencies | Configure dependent upstream tasks. |
| Check interval | Check the dependent upstream task status interval, the default is 10s. |
| Dependency failure policy | Failure: The dependent upstream task failure and the current task directly failure; Wait: The dependent upstream task failure and the current task continues to wait; |
| Dependency failure waiting time | When the dependency failure policy chooses to wait, the current task wait time. |
## Task Examples

8
docs/docs/zh/guide/task/dependent.md

@ -15,7 +15,13 @@ Dependent 节点,就是**依赖检查节点**。比如 A 流程依赖昨天的
[//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md#默认任务参数)`默认任务参数`一栏。)
- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
- 此任务除上述链接中的默认参数外无其他参数。
| **任务参数** | **描述** |
|----------|----------------------------------------------|
| 添加依赖 | 配置依赖的上游任务. |
| 检查间隔 | 检查依赖的上游任务状态间隔,默认10s. |
| 依赖失败策略 | 失败: 依赖的上游任务失败当前任务直接失败;等待: 依赖的上游任务失败当前任务继续等待; |
| 依赖失败等待时间 | 当依赖失败策略选择等待时,当前任务等待的时间. |
## 任务样例

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java

@ -56,7 +56,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DependentAsyncTaskExecuteFunction implements AsyncTaskExecuteFunction {
private static final Duration DEPENDENT_TASK_STATE_CHECK_INTERVAL = Duration.ofSeconds(10);
private static final Duration DEFAULT_STATE_CHECK_INTERVAL = Duration.ofSeconds(10);
private final TaskExecutionContext taskExecutionContext;
private final DependentParameters dependentParameters;
@ -195,6 +195,10 @@ public class DependentAsyncTaskExecuteFunction implements AsyncTaskExecuteFuncti
private boolean isAllDependentTaskFinished() {
boolean isAllDependentTaskFinished = true;
for (DependentExecute dependentExecute : dependentTaskList) {
if (!dependentExecute.finish(dependentDate, processInstance.getTestFlag(),
dependentParameters.getFailurePolicy(), dependentParameters.getFailureWaitingTime())) {
isAllDependentTaskFinished = false;
}
dependentExecute.getDependResultMap().forEach((dependentKey, dependResult) -> {
if (!dependResultMap.containsKey(dependentKey)) {
dependResultMap.put(dependentKey, dependResult);
@ -206,15 +210,13 @@ public class DependentAsyncTaskExecuteFunction implements AsyncTaskExecuteFuncti
dependResult, dependentDate);
}
});
if (!dependentExecute.finish(dependentDate, processInstance.getTestFlag())) {
isAllDependentTaskFinished = false;
}
}
return isAllDependentTaskFinished;
}
@Override
public @NonNull Duration getAsyncTaskStateCheckInterval() {
return DEPENDENT_TASK_STATE_CHECK_INTERVAL;
return dependentParameters.getCheckInterval() == null ? DEFAULT_STATE_CHECK_INTERVAL
: Duration.ofSeconds(dependentParameters.getCheckInterval());
}
}

24
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.utils;
import static org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters.DependentFailurePolicyEnum.DEPENDENT_FAILURE_WAITING;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -27,9 +29,12 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@ -62,11 +67,6 @@ public class DependentExecute {
private TaskInstance taskInstance;
/**
* depend result
*/
private DependResult modelDependResult = DependResult.WAITING;
/**
* depend result map
*/
@ -231,10 +231,15 @@ public class DependentExecute {
* @param currentTime current time
* @return boolean
*/
public boolean finish(Date currentTime, int testFlag) {
public boolean finish(Date currentTime, int testFlag, DependentParameters.DependentFailurePolicyEnum failurePolicy,
Integer failureWaitingTime) {
DependResult modelDependResult = getModelDependResult(currentTime, testFlag);
if (modelDependResult == DependResult.WAITING) {
modelDependResult = getModelDependResult(currentTime, testFlag);
return false;
} else if (modelDependResult == DependResult.FAILED && DEPENDENT_FAILURE_WAITING == failurePolicy
&& failureWaitingTime != null) {
return Duration.between(currentTime.toInstant(), Instant.now())
.compareTo(Duration.ofMinutes(failureWaitingTime)) > 0;
}
return true;
}
@ -260,13 +265,12 @@ public class DependentExecute {
continue;
}
DependResult dependResult = getDependResultForItem(dependentItem, currentTime, testFlag);
if (dependResult != DependResult.WAITING) {
if (dependResult != DependResult.WAITING && dependResult != DependResult.FAILED) {
dependResultMap.put(dependentItem.getKey(), dependResult);
}
dependResultList.add(dependResult);
}
modelDependResult = DependentUtils.getDependResultForRelation(this.relation, dependResultList);
return modelDependResult;
return DependentUtils.getDependResultForRelation(this.relation, dependResultList);
}
/**

13
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java

@ -31,10 +31,23 @@ public class DependentParameters extends AbstractParameters {
private List<DependentTaskModel> dependTaskList;
private DependentRelation relation;
/** Time unit is second */
private Integer checkInterval;
private DependentFailurePolicyEnum failurePolicy;
/** Time unit is minutes */
private Integer failureWaitingTime;
@Override
public boolean checkParameters() {
return true;
}
/**
* the dependent task failure policy.
*/
public enum DependentFailurePolicyEnum {
DEPENDENT_FAILURE_FAILURE,
DEPENDENT_FAILURE_WAITING
}
}

15
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -374,6 +374,7 @@ export default {
times: 'Times',
failed_retry_interval: 'Failed retry interval',
minute: 'Minute',
second: 'Second',
delay_execution_time: 'Delay execution time',
namespace_cluster: 'Namespace(Cluster)',
min_cpu: 'Min cpu',
@ -386,7 +387,8 @@ export default {
command_tips:
'Please enter the container execution command, for example: ["printenv"]',
args: 'Args',
args_tips: 'Please enter the container execution command args, for example: ["HOSTNAME", "KUBERNETES_PORT"]',
args_tips:
'Please enter the container execution command args, for example: ["HOSTNAME", "KUBERNETES_PORT"]',
min_memory_tips: 'Please enter min memory',
state: 'State',
branch_flow: 'Branch flow',
@ -623,6 +625,7 @@ export default {
add_dependency: 'Add dependency',
waiting_dependent_start: 'Waiting Dependent start',
check_interval: 'Check interval',
check_interval_tips: 'Check interval must be a positive integer',
waiting_dependent_complete: 'Waiting Dependent complete',
project_name: 'Project Name',
project_name_tips: 'Please select a project(required)',
@ -695,7 +698,7 @@ export default {
zeppelin_username: 'zeppelinUsername',
zeppelin_username_tips: 'Please enter the zeppelin server username',
zeppelin_password: 'zeppelinPassword',
zeppelin_password_tips: 'Please enter the zeppelin server password',
zeppelin_password_tips: 'Please enter the zeppelin server password',
hive_cli_task_execution_type: 'Hive Cli Task Execution Type',
hive_sql_script: 'Hive SQL Script',
hive_cli_options: 'Hive Cli Options',
@ -823,7 +826,13 @@ export default {
pipeline_name: 'Pipeline Name',
factory_tips: 'Please select factory',
resource_group_tips: 'Please select resource group',
pipeline_tips: 'Please select pipeline'
pipeline_tips: 'Please select pipeline',
dependent_failure_policy: 'Dependent failure policy',
dependent_failure_policy_failure: 'failure',
dependent_failure_policy_waiting: 'waiting',
dependent_failure_waiting_time: 'Dependent failure waiting time',
dependent_failure_waiting_time_tips:
'Failure waiting time must be a positive integer'
},
menu: {
fav: 'Favorites',

19
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -37,10 +37,10 @@ export default {
confirm: '确定',
cancel: '取消',
delete_confirm: '确定删除吗?',
authorize_level:'权限等级',
authorize_level: '权限等级',
no_permission: '无权限',
read_permission: '读权限',
all_permission: '所有权限',
all_permission: '所有权限'
},
workflow: {
on_line: '线上',
@ -221,7 +221,7 @@ export default {
workflow_relation_no_data_result_desc:
'目前没有任何工作流,请先创建工作流,再访问该页面',
ready_to_block: '准备锁定',
block: '锁定',
block: '锁定'
},
task: {
on_line: '线上',
@ -335,7 +335,7 @@ export default {
online: '已上线'
},
node: {
is_cache: "缓存执行",
is_cache: '缓存执行',
is_module_path: '使用模块路径',
run_type: '运行类型',
jvm_args: '虚拟机参数',
@ -373,6 +373,7 @@ export default {
times: '次',
failed_retry_interval: '失败重试间隔',
minute: '分',
second: '秒',
delay_execution_time: '延时执行时间',
namespace_cluster: '命名空间(集群)',
min_cpu: '最小cpu',
@ -615,6 +616,7 @@ export default {
add_dependency: '添加依赖',
waiting_dependent_start: '等待依赖启动',
check_interval: '检查间隔',
check_interval_tips: '检查间隔必须为正整数',
waiting_dependent_complete: '等待依赖完成',
project_name: '项目名称',
project_name_tips: '项目名称(必填)',
@ -801,7 +803,12 @@ export default {
pipeline_name: 'pipeline名称',
factory_tips: '请选择工厂',
resource_group_tips: '请选择资源组',
pipeline_tips: '请选择pipeline'
pipeline_tips: '请选择pipeline',
dependent_failure_policy: '依赖失败策略',
dependent_failure_policy_failure: '失败',
dependent_failure_policy_waiting: '等待',
dependent_failure_waiting_time: '依赖失败等待时间',
dependent_failure_waiting_time_tips: '失败等待时间必须为正整数'
},
menu: {
fav: '收藏组件',
@ -811,6 +818,6 @@ export default {
di: '数据集成',
dq: '数据质量',
ml: '机器学习',
other: '其他',
other: '其他'
}
}

121
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts

@ -15,7 +15,7 @@
* limitations under the License.
*/
import { ref, onMounted, watch, h } from 'vue'
import { ref, onMounted, watch, h, computed } from 'vue'
import { useI18n } from 'vue-i18n'
import { NEllipsis, NIcon } from 'naive-ui'
import { useRelationCustomParams, useDependentTimeout } from '.'
@ -36,13 +36,28 @@ import type {
ITaskState,
IDateType
} from '../types'
import {IRenderOption} from "../types";
import { IRenderOption } from '../types'
export function useDependent(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
const router: Router = useRouter()
const nodeStore = useTaskNodeStore()
const dependentFailurePolicyOptions = computed(() => {
return [
{
label: t('project.node.dependent_failure_policy_failure'),
value: 'DEPENDENT_FAILURE_FAILURE'
},
{
label: t('project.node.dependent_failure_policy_waiting'),
value: 'DEPENDENT_FAILURE_WAITING'
}
]
})
const failureWaitingTimeSpan = computed(() =>
model.failurePolicy === 'DEPENDENT_FAILURE_WAITING' ? 12 : 0
)
const dependentResult = nodeStore.getDependentResult
const TasksStateConfig = tasksState(t)
const projectList = ref([] as IRenderOption[])
@ -248,23 +263,26 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] {
if (!item.dependItemList?.length) return
const itemListOptions = ref([] as IDependentItemOptions[])
item.dependItemList?.forEach(async (dependItem: IDependentItem, itemIndex: number) => {
itemListOptions.value[itemIndex] = {}
if (dependItem.projectCode) {
itemListOptions.value[itemIndex].definitionCodeOptions = await getProcessList(
dependItem.projectCode
)
}
if (dependItem.projectCode && dependItem.definitionCode) {
itemListOptions.value[itemIndex].depTaskCodeOptions = await getTaskList(
dependItem.projectCode,
dependItem.definitionCode
)
}
if (dependItem.cycle) {
itemListOptions.value[itemIndex].dateOptions = DATE_LIST[dependItem.cycle]
item.dependItemList?.forEach(
async (dependItem: IDependentItem, itemIndex: number) => {
itemListOptions.value[itemIndex] = {}
if (dependItem.projectCode) {
itemListOptions.value[itemIndex].definitionCodeOptions =
await getProcessList(dependItem.projectCode)
}
if (dependItem.projectCode && dependItem.definitionCode) {
itemListOptions.value[itemIndex].depTaskCodeOptions =
await getTaskList(
dependItem.projectCode,
dependItem.definitionCode
)
}
if (dependItem.cycle) {
itemListOptions.value[itemIndex].dateOptions =
DATE_LIST[dependItem.cycle]
}
}
})
)
selectOptions.value[taskIndex] = {} as IDependTaskOptions
selectOptions.value[taskIndex].dependItemList = itemListOptions.value
})
@ -297,7 +315,9 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] {
const options = selectOptions?.value[i] || {}
const itemListOptions = options?.dependItemList || []
const itemOptions = {} as IDependentItemOptions
itemOptions.definitionCodeOptions = await getProcessList(projectCode)
itemOptions.definitionCodeOptions = await getProcessList(
projectCode
)
itemListOptions[j] = itemOptions
options.dependItemList = itemListOptions
selectOptions.value[i] = options
@ -331,15 +351,14 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] {
},
onUpdateValue: async (processCode: number) => {
const item = model.dependTaskList[i].dependItemList[j]
selectOptions.value[i].dependItemList[j].depTaskCodeOptions = await getTaskList(
item.projectCode,
processCode
)
selectOptions.value[i].dependItemList[j].depTaskCodeOptions =
await getTaskList(item.projectCode, processCode)
item.depTaskCode = 0
}
},
options: selectOptions.value[i]?.dependItemList[j]
?.definitionCodeOptions || [],
options:
selectOptions.value[i]?.dependItemList[j]
?.definitionCodeOptions || [],
path: `dependTaskList.${i}.dependItemList.${j}.definitionCode`,
rule: {
required: true,
@ -430,6 +449,56 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] {
}),
childrenField: 'dependItemList',
name: 'add_dependency'
})
}),
{
type: 'input-number',
field: 'checkInterval',
name: t('project.node.check_interval'),
span: 12,
props: {
max: Math.pow(9, 10) - 1
},
slots: {
suffix: () => t('project.node.second')
},
validate: {
trigger: ['input'],
validator(validate: any, value: number) {
if (!value && !/^[1-9]\d*$/.test(String(value))) {
return new Error(t('project.node.check_interval_tips'))
}
}
}
},
{
type: 'radio',
field: 'failurePolicy',
name: t('project.node.dependent_failure_policy'),
options: dependentFailurePolicyOptions,
span: 24
},
{
type: 'input-number',
field: 'failureWaitingTime',
name: t('project.node.dependent_failure_waiting_time'),
span: failureWaitingTimeSpan,
props: {
max: Math.pow(9, 10) - 1
},
slots: {
suffix: () => t('project.node.minute')
},
validate: {
trigger: ['input'],
required: true,
validator(validate: any, value: number) {
if (model.timeoutFlag && !/^[1-9]\d*$/.test(String(value))) {
return new Error(
t('project.node.dependent_failure_waiting_time_tips')
)
}
}
}
}
]
}

13
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -23,8 +23,7 @@ import type {
ISqoopTargetParams,
ISqoopSourceParams,
ILocalParam,
IDependTask,
RelationType
IDependentParameters
} from './types'
export function formatParams(data: INodeData): {
@ -279,6 +278,9 @@ export function formatParams(data: INodeData): {
}
if (data.taskType === 'DEPENDENT') {
taskParams.dependence = {
checkInterval: data.checkInterval,
failurePolicy: data.failurePolicy,
failureWaitingTime: data.failureWaitingTime,
relation: data.relation,
dependTaskList: data.dependTaskList
}
@ -651,7 +653,12 @@ export function formatModel(data: ITaskData) {
}
if (data.taskParams?.dependence) {
const dependence: { relation?: RelationType, dependTaskList?: IDependTask[] } = JSON.parse(JSON.stringify(data.taskParams.dependence))
const dependence: IDependentParameters = JSON.parse(
JSON.stringify(data.taskParams.dependence)
)
params.checkInterval = dependence.checkInterval
params.failurePolicy = dependence.failurePolicy
params.failureWaitingTime = dependence.failureWaitingTime
params.dependTaskList = dependence.dependTaskList || []
params.relation = dependence.relation
}

2
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dependent.ts

@ -48,6 +48,8 @@ export function useDependent({
timeoutNotifyStrategy: [],
timeout: 30,
timeoutFlag: false,
failurePolicy: 'DEPENDENT_FAILURE_FAILURE',
checkInterval: 10,
...data
} as INodeData)

17
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -107,6 +107,14 @@ interface ISwitchResult {
nextNode?: number
}
interface IDependentParameters {
checkInterval?: number
failurePolicy?: 'DEPENDENT_FAILURE_FAILURE' | 'DEPENDENT_FAILURE_WAITING'
failureWaitingTime?: number
relation?: RelationType
dependTaskList?: IDependTask[]
}
/*
* resourceName: resource full name
* res: resource file name
@ -301,10 +309,7 @@ interface ITaskParams {
switchResult?: ISwitchResult
dependTaskList?: IDependTask[]
nextNode?: number
dependence?: {
relation?: RelationType
dependTaskList?: IDependTask[]
}
dependence?: IDependentParameters
customConfig?: number
json?: string
dsType?: string
@ -441,6 +446,7 @@ interface INodeData
>,
ISqoopTargetData,
ISqoopSourceData,
IDependentParameters,
Omit<IRuleParameters, 'mapping_columns'> {
id?: string
taskType?: ITaskType
@ -519,5 +525,6 @@ export {
FormRules,
IJsonItemParams,
IResponseJsonItem,
IDateType
IDateType,
IDependentParameters
}

Loading…
Cancel
Save