From c0348a3cdf18aca020251806dcb5541884faed29 Mon Sep 17 00:00:00 2001 From: Devosend Date: Sat, 19 Feb 2022 17:18:55 +0800 Subject: [PATCH] [Feature][UI Next] Add flink task (#8446) * Add flink task * delete redundant file --- .../src/locales/modules/en_US.ts | 18 +- .../src/locales/modules/zh_CN.ts | 17 +- .../src/service/modules/resources/index.ts | 4 +- .../task/components/node/fields/index.ts | 1 + .../task/components/node/fields/use-flink.ts | 343 ++++++++++++++++++ .../task/components/node/format-data.ts | 32 +- .../task/components/node/tasks/use-flink.ts | 88 +++++ .../projects/task/components/node/types.ts | 6 + .../projects/task/components/node/use-task.ts | 9 + 9 files changed, 502 insertions(+), 16 deletions(-) create mode 100644 dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts create mode 100644 dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-flink.ts diff --git a/dolphinscheduler-ui-next/src/locales/modules/en_US.ts b/dolphinscheduler-ui-next/src/locales/modules/en_US.ts index 8ca116ac72..1834718a11 100644 --- a/dolphinscheduler-ui-next/src/locales/modules/en_US.ts +++ b/dolphinscheduler-ui-next/src/locales/modules/en_US.ts @@ -637,7 +637,23 @@ const project = { main_arguments_tips: 'Please enter main arguments', option_parameters: 'Option Parameters', option_parameters_tips: 'Please enter option parameters', - positive_integer_tips: 'should be a positive integer' + positive_integer_tips: 'should be a positive integer', + flink_version: 'Flink Version', + job_manager_memory: 'JobManager Memory', + job_manager_memory_tips: 'Please enter JobManager memory', + task_manager_memory: 'TaskManager Memory', + task_manager_memory_tips: 'Please enter TaskManager memory', + slot_number: 'Slot Number', + slot_number_tips: 'Please enter Slot number', + parallelism: 'Parallelism', + custom_parallelism: 'Configure parallelism', + parallelism_tips: 'Please enter Parallelism', + parallelism_number_tips: 'Parallelism number should be positive integer', + parallelism_complement_tips: + 'If there are a large number of tasks requiring complement, you can use the custom parallelism to ' + + 'set the complement task thread to a reasonable value to avoid too large impact on the server.', + task_manager_number: 'TaskManager Number', + task_manager_number_tips: 'Please enter TaskManager number' } } diff --git a/dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts b/dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts index 3103a53857..83a93c149a 100644 --- a/dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts +++ b/dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts @@ -630,7 +630,22 @@ const project = { main_arguments_tips: '请输入主程序参数', option_parameters: '选项参数', option_parameters_tips: '请输入选项参数', - positive_integer_tips: '应为正整数' + positive_integer_tips: '应为正整数', + flink_version: 'Flink版本', + job_manager_memory: 'JobManager内存数', + job_manager_memory_tips: '请输入JobManager内存数', + task_manager_memory: 'TaskManager内存数', + task_manager_memory_tips: '请输入TaskManager内存数', + slot_number: 'Slot数量', + slot_number_tips: '请输入Slot数量', + parallelism: '并行度', + custom_parallelism: '自定义并行度', + parallelism_tips: '请输入并行度', + parallelism_number_tips: '并行度必须为正整数', + parallelism_complement_tips: + '如果存在大量任务需要补数时,可以利用自定义并行度将补数的任务线程设置成合理的数值,避免对服务器造成过大的影响', + task_manager_number: 'TaskManager数量', + task_manager_number_tips: '请输入TaskManager数量' } } diff --git a/dolphinscheduler-ui-next/src/service/modules/resources/index.ts b/dolphinscheduler-ui-next/src/service/modules/resources/index.ts index a662ab6095..62bb592448 100644 --- a/dolphinscheduler-ui-next/src/service/modules/resources/index.ts +++ b/dolphinscheduler-ui-next/src/service/modules/resources/index.ts @@ -118,7 +118,9 @@ export function onlineCreateResource( }) } -export function queryResourceByProgramType(params: ResourceTypeReq): any { +export function queryResourceByProgramType( + params: ResourceTypeReq & ProgramTypeReq +): any { return axios({ url: '/resources/query-by-type', method: 'get', diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts index 6aaf106533..3d452260b5 100644 --- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts +++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts @@ -33,3 +33,4 @@ export { useChildNode } from './use-child-node' export { useShell } from './use-shell' export { useSpark } from './use-spark' export { useMr } from './use-mr' +export { useFlink } from './use-flink' diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts new file mode 100644 index 0000000000..8b2c829d75 --- /dev/null +++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { ref, onMounted, computed } from 'vue' +import { useI18n } from 'vue-i18n' +import { queryResourceByProgramType } from '@/service/modules/resources' +import { removeUselessChildren } from './use-shell' +import type { IJsonItem } from '../types' + +export function useFlink(model: { [field: string]: any }): IJsonItem[] { + const { t } = useI18n() + + const mainClassSpan = computed(() => + model.programType === 'PYTHON' ? 0 : 24 + ) + + const taskManagerNumberSpan = computed(() => + model.flinkVersion === '<1.10' && model.deployMode === 'cluster' ? 12 : 0 + ) + + const deployModeSpan = computed(() => + model.deployMode === 'cluster' ? 12 : 0 + ) + + const mainJarOptions = ref([]) + const resources: { [field: string]: any } = {} + + const getResourceList = async (programType: string) => { + if (resources[programType] !== void 0) { + mainJarOptions.value = resources[programType] + return + } + try { + const res = await queryResourceByProgramType({ + type: 'FILE', + programType + }) + removeUselessChildren(res) + mainJarOptions.value = res || [] + resources[programType] = res + } catch (err) {} + } + + onMounted(() => { + getResourceList(model.programType) + }) + + return [ + { + type: 'select', + field: 'programType', + span: 12, + name: t('project.node.program_type'), + options: PROGRAM_TYPES, + props: { + 'on-update:value': (value: string) => { + model.mainJar = null + model.mainClass = '' + getResourceList(value) + } + }, + value: model.programType + }, + { + type: 'input', + field: 'mainClass', + span: mainClassSpan, + name: t('project.node.main_class'), + props: { + placeholder: t('project.node.main_class_tips') + }, + validate: { + trigger: ['input', 'blur'], + required: model.programType !== 'PYTHON', + validator(validate: any, value: string) { + if (model.programType !== 'PYTHON' && !value) { + return new Error(t('project.node.main_class_tips')) + } + } + } + }, + { + type: 'tree-select', + field: 'mainJar', + name: t('project.node.main_package'), + props: { + cascade: true, + showPath: true, + checkStrategy: 'child', + placeholder: t('project.node.main_package_tips'), + keyField: 'id', + labelField: 'fullName' + }, + validate: { + trigger: ['input', 'blur'], + required: model.programType !== 'PYTHON', + validator(validate: any, value: string) { + if (!value) { + return new Error(t('project.node.main_package_tips')) + } + } + }, + options: mainJarOptions + }, + { + type: 'radio', + field: 'deployMode', + name: t('project.node.deploy_mode'), + options: DeployModes + }, + { + type: 'select', + field: 'flinkVersion', + span: 12, + name: t('project.node.flink_version'), + options: FLINK_VERSIONS, + value: model.flinkVersion + }, + { + type: 'input', + field: 'appName', + name: t('project.node.app_name'), + props: { + placeholder: t('project.node.app_name_tips') + } + }, + { + type: 'input', + field: 'jobManagerMemory', + name: t('project.node.job_manager_memory'), + span: deployModeSpan, + props: { + placeholder: t('project.node.job_manager_memory_tips'), + min: 1 + }, + validate: { + trigger: ['input', 'blur'], + validator(validate: any, value: string) { + if (!value) { + return + } + if (!Number.isInteger(parseInt(value))) { + return new Error( + t('project.node.job_manager_memory_tips') + + t('project.node.positive_integer_tips') + ) + } + } + } + }, + { + type: 'input', + field: 'taskManagerMemory', + name: t('project.node.task_manager_memory'), + span: deployModeSpan, + props: { + placeholder: t('project.node.task_manager_memory_tips') + }, + validate: { + trigger: ['input', 'blur'], + validator(validate: any, value: string) { + if (!value) { + return + } + if (!Number.isInteger(parseInt(value))) { + return new Error( + t('project.node.task_manager_memory') + + t('project.node.positive_integer_tips') + ) + } + } + }, + value: model.taskManagerMemory + }, + { + type: 'input-number', + field: 'slot', + name: t('project.node.slot_number'), + span: deployModeSpan, + props: { + placeholder: t('project.node.slot_number_tips'), + min: 1 + }, + value: model.slot + }, + { + type: 'input-number', + field: 'taskManager', + name: t('project.node.task_manager_number'), + span: taskManagerNumberSpan, + props: { + placeholder: t('project.node.task_manager_number_tips') + }, + value: model.taskManager + }, + { + type: 'input-number', + field: 'parallelism', + name: t('project.node.parallelism'), + span: 12, + props: { + placeholder: t('project.node.parallelism_tips'), + min: 1 + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(validate: any, value: string) { + if (!value) { + return new Error(t('project.node.parallelism_tips')) + } + } + }, + value: model.parallelism + }, + { + type: 'input', + field: 'mainArgs', + name: t('project.node.main_arguments'), + props: { + type: 'textarea', + placeholder: t('project.node.main_arguments_tips') + } + }, + { + type: 'input', + field: 'others', + name: t('project.node.option_parameters'), + props: { + type: 'textarea', + placeholder: t('project.node.option_parameters_tips') + } + }, + { + type: 'tree-select', + field: 'resourceList', + name: t('project.node.resources'), + options: mainJarOptions, + props: { + multiple: true, + checkable: true, + cascade: true, + showPath: true, + checkStrategy: 'child', + placeholder: t('project.node.resources_tips'), + keyField: 'id', + labelField: 'name' + } + }, + { + type: 'custom-parameters', + field: 'localParams', + name: t('project.node.custom_parameters'), + children: [ + { + type: 'input', + field: 'prop', + span: 10, + props: { + placeholder: t('project.node.prop_tips'), + maxLength: 256 + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(validate: any, value: string) { + if (!value) { + return new Error(t('project.node.prop_tips')) + } + + const sameItems = model.localParams.filter( + (item: { prop: string }) => item.prop === value + ) + + if (sameItems.length > 1) { + return new Error(t('project.node.prop_repeat')) + } + } + } + }, + { + type: 'input', + field: 'value', + span: 10, + props: { + placeholder: t('project.node.value_tips'), + maxLength: 256 + } + } + ] + } + ] +} + +const PROGRAM_TYPES = [ + { + label: 'JAVA', + value: 'JAVA' + }, + { + label: 'SCALA', + value: 'SCALA' + }, + { + label: 'PYTHON', + value: 'PYTHON' + } +] + +const FLINK_VERSIONS = [ + { + label: '<1.10', + value: '<1.10' + }, + { + label: '>=1.10', + value: '>=1.10' + } +] + +const DeployModes = [ + { + label: 'cluster', + value: 'cluster' + }, + { + label: 'local', + value: 'local' + } +] diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts index a7af76649d..add6196d8b 100644 --- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts @@ -24,32 +24,38 @@ export function formatParams(data: INodeData): { taskDefinitionJsonObj: object } { const taskParams: ITaskParams = {} - if (data.taskType === 'SPARK') { + if ( + data.taskType === 'SPARK' || + data.taskType === 'MR' || + data.taskType === 'FLINK' + ) { taskParams.programType = data.programType - taskParams.sparkVersion = data.sparkVersion taskParams.mainClass = data.mainClass if (data.mainJar) { taskParams.mainJar = { id: data.mainJar } } taskParams.deployMode = data.deployMode taskParams.appName = data.appName + taskParams.mainArgs = data.mainArgs + taskParams.others = data.others + } + + if (data.taskType === 'SPARK') { + taskParams.sparkVersion = data.sparkVersion taskParams.driverCores = data.driverCores taskParams.driverMemory = data.driverMemory taskParams.numExecutors = data.numExecutors taskParams.executorMemory = data.executorMemory taskParams.executorCores = data.executorCores - taskParams.mainArgs = data.mainArgs - taskParams.others = data.others } - if (data.taskType === 'MR') { - taskParams.programType = data.programType - taskParams.mainClass = data.mainClass - if (data.mainJar) { - taskParams.mainJar = { id: data.mainJar } - } - taskParams.appName = data.appName - taskParams.mainArgs = data.mainArgs - taskParams.others = data.others + + if (data.taskType === 'FLINK') { + taskParams.flinkVersion = data.flinkVersion + taskParams.jobManagerMemory = data.jobManagerMemory + taskParams.taskManagerMemory = data.taskManagerMemory + taskParams.slot = data.slot + taskParams.taskManager = data.taskManager + taskParams.parallelism = data.parallelism } const params = { diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-flink.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-flink.ts new file mode 100644 index 0000000000..5b0f7e0023 --- /dev/null +++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-flink.ts @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { reactive } from 'vue' +import * as Fields from '../fields/index' +import type { IJsonItem, INodeData, ITaskData } from '../types' + +export function useFlink({ + projectCode, + from = 0, + readonly, + data +}: { + projectCode: number + from?: number + readonly?: boolean + data?: ITaskData +}) { + const model = reactive({ + name: '', + flag: 'YES', + description: '', + timeoutFlag: false, + localParams: [], + environmentCode: null, + failRetryInterval: 1, + failRetryTimes: 0, + workerGroup: 'default', + delayTime: 0, + timeout: 30, + programType: 'SCALA', + deployMode: 'cluster', + flinkVersion: '<1.10', + jobManagerMemory: '1G', + taskManagerMemory: '2G', + slot: 1, + taskManager: 2, + parallelism: 1 + }) + + let extra: IJsonItem[] = [] + if (from === 1) { + extra = [ + Fields.useTaskType(model, readonly), + Fields.useProcessName({ + model, + projectCode, + isCreate: !data?.id, + from, + processName: data?.processName, + code: data?.code + }) + ] + } + + return { + json: [ + Fields.useName(), + ...extra, + Fields.useRunFlag(), + Fields.useDescription(), + Fields.useTaskPriority(), + Fields.useWorkerGroup(), + Fields.useEnvironmentName(model, !data?.id), + ...Fields.useTaskGroup(model, projectCode), + ...Fields.useFailed(), + Fields.useDelayTime(model), + ...Fields.useTimeoutAlarm(model), + ...Fields.useFlink(model), + Fields.usePreTasks(model) + ] as IJsonItem[], + model + } +} diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts index 18f2312228..43fb59764f 100644 --- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts @@ -47,6 +47,12 @@ interface ITaskParams { rawScript?: string programType?: string sparkVersion?: string + flinkVersion?: string + jobManagerMemory?: string + taskManagerMemory?: string + slot?: number + taskManager?: number + parallelism?: number mainClass?: string deployMode?: string appName?: string diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts index 354dd438a6..b010229935 100644 --- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts +++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts @@ -15,6 +15,7 @@ * limitations under the License. */ +import { useFlink } from './tasks/use-flink' import { useShell } from './tasks/use-shell' import { useSubProcess } from './tasks/use-sub-process' import { usePython } from './tasks/use-python' @@ -75,5 +76,13 @@ export function useTask({ data }) } + if (taskType === 'FLINK') { + node = useFlink({ + projectCode, + from, + readonly, + data + }) + } return node }