Browse Source

[Feature][UI Next] Add flink task (#8446)

* Add flink task

* delete redundant file
3.0.0/version-upgrade
Devosend 2 years ago committed by GitHub
parent
commit
c0348a3cdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      dolphinscheduler-ui-next/src/locales/modules/en_US.ts
  2. 17
      dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts
  3. 4
      dolphinscheduler-ui-next/src/service/modules/resources/index.ts
  4. 1
      dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts
  5. 343
      dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts
  6. 32
      dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts
  7. 88
      dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-flink.ts
  8. 6
      dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts
  9. 9
      dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts

18
dolphinscheduler-ui-next/src/locales/modules/en_US.ts

@ -637,7 +637,23 @@ const project = {
main_arguments_tips: 'Please enter main arguments',
option_parameters: 'Option Parameters',
option_parameters_tips: 'Please enter option parameters',
positive_integer_tips: 'should be a positive integer'
positive_integer_tips: 'should be a positive integer',
flink_version: 'Flink Version',
job_manager_memory: 'JobManager Memory',
job_manager_memory_tips: 'Please enter JobManager memory',
task_manager_memory: 'TaskManager Memory',
task_manager_memory_tips: 'Please enter TaskManager memory',
slot_number: 'Slot Number',
slot_number_tips: 'Please enter Slot number',
parallelism: 'Parallelism',
custom_parallelism: 'Configure parallelism',
parallelism_tips: 'Please enter Parallelism',
parallelism_number_tips: 'Parallelism number should be positive integer',
parallelism_complement_tips:
'If there are a large number of tasks requiring complement, you can use the custom parallelism to ' +
'set the complement task thread to a reasonable value to avoid too large impact on the server.',
task_manager_number: 'TaskManager Number',
task_manager_number_tips: 'Please enter TaskManager number'
}
}

17
dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts

@ -630,7 +630,22 @@ const project = {
main_arguments_tips: '请输入主程序参数',
option_parameters: '选项参数',
option_parameters_tips: '请输入选项参数',
positive_integer_tips: '应为正整数'
positive_integer_tips: '应为正整数',
flink_version: 'Flink版本',
job_manager_memory: 'JobManager内存数',
job_manager_memory_tips: '请输入JobManager内存数',
task_manager_memory: 'TaskManager内存数',
task_manager_memory_tips: '请输入TaskManager内存数',
slot_number: 'Slot数量',
slot_number_tips: '请输入Slot数量',
parallelism: '并行度',
custom_parallelism: '自定义并行度',
parallelism_tips: '请输入并行度',
parallelism_number_tips: '并行度必须为正整数',
parallelism_complement_tips:
'如果存在大量任务需要补数时,可以利用自定义并行度将补数的任务线程设置成合理的数值,避免对服务器造成过大的影响',
task_manager_number: 'TaskManager数量',
task_manager_number_tips: '请输入TaskManager数量'
}
}

4
dolphinscheduler-ui-next/src/service/modules/resources/index.ts

@ -118,7 +118,9 @@ export function onlineCreateResource(
})
}
export function queryResourceByProgramType(params: ResourceTypeReq): any {
export function queryResourceByProgramType(
params: ResourceTypeReq & ProgramTypeReq
): any {
return axios({
url: '/resources/query-by-type',
method: 'get',

1
dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts

@ -33,3 +33,4 @@ export { useChildNode } from './use-child-node'
export { useShell } from './use-shell'
export { useSpark } from './use-spark'
export { useMr } from './use-mr'
export { useFlink } from './use-flink'

343
dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts

@ -0,0 +1,343 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ref, onMounted, computed } from 'vue'
import { useI18n } from 'vue-i18n'
import { queryResourceByProgramType } from '@/service/modules/resources'
import { removeUselessChildren } from './use-shell'
import type { IJsonItem } from '../types'
export function useFlink(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
const mainClassSpan = computed(() =>
model.programType === 'PYTHON' ? 0 : 24
)
const taskManagerNumberSpan = computed(() =>
model.flinkVersion === '<1.10' && model.deployMode === 'cluster' ? 12 : 0
)
const deployModeSpan = computed(() =>
model.deployMode === 'cluster' ? 12 : 0
)
const mainJarOptions = ref([])
const resources: { [field: string]: any } = {}
const getResourceList = async (programType: string) => {
if (resources[programType] !== void 0) {
mainJarOptions.value = resources[programType]
return
}
try {
const res = await queryResourceByProgramType({
type: 'FILE',
programType
})
removeUselessChildren(res)
mainJarOptions.value = res || []
resources[programType] = res
} catch (err) {}
}
onMounted(() => {
getResourceList(model.programType)
})
return [
{
type: 'select',
field: 'programType',
span: 12,
name: t('project.node.program_type'),
options: PROGRAM_TYPES,
props: {
'on-update:value': (value: string) => {
model.mainJar = null
model.mainClass = ''
getResourceList(value)
}
},
value: model.programType
},
{
type: 'input',
field: 'mainClass',
span: mainClassSpan,
name: t('project.node.main_class'),
props: {
placeholder: t('project.node.main_class_tips')
},
validate: {
trigger: ['input', 'blur'],
required: model.programType !== 'PYTHON',
validator(validate: any, value: string) {
if (model.programType !== 'PYTHON' && !value) {
return new Error(t('project.node.main_class_tips'))
}
}
}
},
{
type: 'tree-select',
field: 'mainJar',
name: t('project.node.main_package'),
props: {
cascade: true,
showPath: true,
checkStrategy: 'child',
placeholder: t('project.node.main_package_tips'),
keyField: 'id',
labelField: 'fullName'
},
validate: {
trigger: ['input', 'blur'],
required: model.programType !== 'PYTHON',
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.main_package_tips'))
}
}
},
options: mainJarOptions
},
{
type: 'radio',
field: 'deployMode',
name: t('project.node.deploy_mode'),
options: DeployModes
},
{
type: 'select',
field: 'flinkVersion',
span: 12,
name: t('project.node.flink_version'),
options: FLINK_VERSIONS,
value: model.flinkVersion
},
{
type: 'input',
field: 'appName',
name: t('project.node.app_name'),
props: {
placeholder: t('project.node.app_name_tips')
}
},
{
type: 'input',
field: 'jobManagerMemory',
name: t('project.node.job_manager_memory'),
span: deployModeSpan,
props: {
placeholder: t('project.node.job_manager_memory_tips'),
min: 1
},
validate: {
trigger: ['input', 'blur'],
validator(validate: any, value: string) {
if (!value) {
return
}
if (!Number.isInteger(parseInt(value))) {
return new Error(
t('project.node.job_manager_memory_tips') +
t('project.node.positive_integer_tips')
)
}
}
}
},
{
type: 'input',
field: 'taskManagerMemory',
name: t('project.node.task_manager_memory'),
span: deployModeSpan,
props: {
placeholder: t('project.node.task_manager_memory_tips')
},
validate: {
trigger: ['input', 'blur'],
validator(validate: any, value: string) {
if (!value) {
return
}
if (!Number.isInteger(parseInt(value))) {
return new Error(
t('project.node.task_manager_memory') +
t('project.node.positive_integer_tips')
)
}
}
},
value: model.taskManagerMemory
},
{
type: 'input-number',
field: 'slot',
name: t('project.node.slot_number'),
span: deployModeSpan,
props: {
placeholder: t('project.node.slot_number_tips'),
min: 1
},
value: model.slot
},
{
type: 'input-number',
field: 'taskManager',
name: t('project.node.task_manager_number'),
span: taskManagerNumberSpan,
props: {
placeholder: t('project.node.task_manager_number_tips')
},
value: model.taskManager
},
{
type: 'input-number',
field: 'parallelism',
name: t('project.node.parallelism'),
span: 12,
props: {
placeholder: t('project.node.parallelism_tips'),
min: 1
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.parallelism_tips'))
}
}
},
value: model.parallelism
},
{
type: 'input',
field: 'mainArgs',
name: t('project.node.main_arguments'),
props: {
type: 'textarea',
placeholder: t('project.node.main_arguments_tips')
}
},
{
type: 'input',
field: 'others',
name: t('project.node.option_parameters'),
props: {
type: 'textarea',
placeholder: t('project.node.option_parameters_tips')
}
},
{
type: 'tree-select',
field: 'resourceList',
name: t('project.node.resources'),
options: mainJarOptions,
props: {
multiple: true,
checkable: true,
cascade: true,
showPath: true,
checkStrategy: 'child',
placeholder: t('project.node.resources_tips'),
keyField: 'id',
labelField: 'name'
}
},
{
type: 'custom-parameters',
field: 'localParams',
name: t('project.node.custom_parameters'),
children: [
{
type: 'input',
field: 'prop',
span: 10,
props: {
placeholder: t('project.node.prop_tips'),
maxLength: 256
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.prop_tips'))
}
const sameItems = model.localParams.filter(
(item: { prop: string }) => item.prop === value
)
if (sameItems.length > 1) {
return new Error(t('project.node.prop_repeat'))
}
}
}
},
{
type: 'input',
field: 'value',
span: 10,
props: {
placeholder: t('project.node.value_tips'),
maxLength: 256
}
}
]
}
]
}
const PROGRAM_TYPES = [
{
label: 'JAVA',
value: 'JAVA'
},
{
label: 'SCALA',
value: 'SCALA'
},
{
label: 'PYTHON',
value: 'PYTHON'
}
]
const FLINK_VERSIONS = [
{
label: '<1.10',
value: '<1.10'
},
{
label: '>=1.10',
value: '>=1.10'
}
]
const DeployModes = [
{
label: 'cluster',
value: 'cluster'
},
{
label: 'local',
value: 'local'
}
]

32
dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts

@ -24,32 +24,38 @@ export function formatParams(data: INodeData): {
taskDefinitionJsonObj: object
} {
const taskParams: ITaskParams = {}
if (data.taskType === 'SPARK') {
if (
data.taskType === 'SPARK' ||
data.taskType === 'MR' ||
data.taskType === 'FLINK'
) {
taskParams.programType = data.programType
taskParams.sparkVersion = data.sparkVersion
taskParams.mainClass = data.mainClass
if (data.mainJar) {
taskParams.mainJar = { id: data.mainJar }
}
taskParams.deployMode = data.deployMode
taskParams.appName = data.appName
taskParams.mainArgs = data.mainArgs
taskParams.others = data.others
}
if (data.taskType === 'SPARK') {
taskParams.sparkVersion = data.sparkVersion
taskParams.driverCores = data.driverCores
taskParams.driverMemory = data.driverMemory
taskParams.numExecutors = data.numExecutors
taskParams.executorMemory = data.executorMemory
taskParams.executorCores = data.executorCores
taskParams.mainArgs = data.mainArgs
taskParams.others = data.others
}
if (data.taskType === 'MR') {
taskParams.programType = data.programType
taskParams.mainClass = data.mainClass
if (data.mainJar) {
taskParams.mainJar = { id: data.mainJar }
}
taskParams.appName = data.appName
taskParams.mainArgs = data.mainArgs
taskParams.others = data.others
if (data.taskType === 'FLINK') {
taskParams.flinkVersion = data.flinkVersion
taskParams.jobManagerMemory = data.jobManagerMemory
taskParams.taskManagerMemory = data.taskManagerMemory
taskParams.slot = data.slot
taskParams.taskManager = data.taskManager
taskParams.parallelism = data.parallelism
}
const params = {

88
dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-flink.ts

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { reactive } from 'vue'
import * as Fields from '../fields/index'
import type { IJsonItem, INodeData, ITaskData } from '../types'
export function useFlink({
projectCode,
from = 0,
readonly,
data
}: {
projectCode: number
from?: number
readonly?: boolean
data?: ITaskData
}) {
const model = reactive<INodeData>({
name: '',
flag: 'YES',
description: '',
timeoutFlag: false,
localParams: [],
environmentCode: null,
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
delayTime: 0,
timeout: 30,
programType: 'SCALA',
deployMode: 'cluster',
flinkVersion: '<1.10',
jobManagerMemory: '1G',
taskManagerMemory: '2G',
slot: 1,
taskManager: 2,
parallelism: 1
})
let extra: IJsonItem[] = []
if (from === 1) {
extra = [
Fields.useTaskType(model, readonly),
Fields.useProcessName({
model,
projectCode,
isCreate: !data?.id,
from,
processName: data?.processName,
code: data?.code
})
]
}
return {
json: [
Fields.useName(),
...extra,
Fields.useRunFlag(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),
Fields.useEnvironmentName(model, !data?.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useFlink(model),
Fields.usePreTasks(model)
] as IJsonItem[],
model
}
}

6
dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts

@ -47,6 +47,12 @@ interface ITaskParams {
rawScript?: string
programType?: string
sparkVersion?: string
flinkVersion?: string
jobManagerMemory?: string
taskManagerMemory?: string
slot?: number
taskManager?: number
parallelism?: number
mainClass?: string
deployMode?: string
appName?: string

9
dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts

@ -15,6 +15,7 @@
* limitations under the License.
*/
import { useFlink } from './tasks/use-flink'
import { useShell } from './tasks/use-shell'
import { useSubProcess } from './tasks/use-sub-process'
import { usePython } from './tasks/use-python'
@ -75,5 +76,13 @@ export function useTask({
data
})
}
if (taskType === 'FLINK') {
node = useFlink({
projectCode,
from,
readonly,
data
})
}
return node
}

Loading…
Cancel
Save