Browse Source

Compatible with flink1.10 or newer (#2952)

* 兼容flink1.10以上版本

* fix null point bug

Co-authored-by: 李巨丰 <lijf@2345.com>
Co-authored-by: dailidong <dailidong66@gmail.com>
pull/2/head
lijufeng2016 4 years ago committed by GitHub
parent
commit
284c50f66c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
  2. 18
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
  3. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
  4. 67
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
  5. 1
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  6. 1
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

13
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java

@ -90,6 +90,11 @@ public class FlinkParameters extends AbstractParameters {
*/
private String others;
/**
* flink version
*/
private String flinkVersion;
/**
* program type
* 0 JAVA,1 SCALA,2 PYTHON
@ -200,6 +205,14 @@ public class FlinkParameters extends AbstractParameters {
this.programType = programType;
}
public String getFlinkVersion() {
return flinkVersion;
}
public void setFlinkVersion(String flinkVersion) {
this.flinkVersion = flinkVersion;
}
@Override
public boolean checkParameters() {
return mainJar != null && programType != null;

18
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

@ -28,10 +28,12 @@ import java.util.List;
/**
* spark args utils
* flink args utils
*/
public class FlinkArgsUtils {
private static final String LOCAL_DEPLOY_MODE = "local";
private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
/**
* build args
* @param param flink parameters
@ -44,7 +46,6 @@ public class FlinkArgsUtils {
String tmpDeployMode = param.getDeployMode();
if (StringUtils.isNotEmpty(tmpDeployMode)) {
deployMode = tmpDeployMode;
}
if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
args.add(Constants.FLINK_RUN_MODE); //-m
@ -63,12 +64,15 @@ public class FlinkArgsUtils {
args.add(appName);
}
int taskManager = param.getTaskManager();
if (taskManager != 0) { //-yn
args.add(Constants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
// judgy flink version,from flink1.10,the parameter -yn removed
String flinkVersion = param.getFlinkVersion();
if (FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = param.getTaskManager();
if (taskManager != 0) { //-yn
args.add(Constants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
}
String jobManagerMemory = param.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
args.add(Constants.FLINK_JOB_MANAGE_MEM);

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java

@ -49,6 +49,7 @@ public class FlinkArgsUtilsTest {
public String mainArgs = "testArgs";
public String queue = "queue1";
public String others = "--input file:///home";
public String flinkVersion = "<1.10";
@Before
@ -79,6 +80,7 @@ public class FlinkArgsUtilsTest {
param.setMainArgs(mainArgs);
param.setQueue(queue);
param.setOthers(others);
param.setFlinkVersion(flinkVersion);
//Invoke buildArgs
List<String> result = FlinkArgsUtils.buildArgs(param);
@ -128,4 +130,4 @@ public class FlinkArgsUtilsTest {
assertEquals(5, result.size());
}
}
}

67
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue

@ -62,56 +62,73 @@
</x-radio-group>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Flink Version')}}</div>
<div slot="content">
<x-select
style="width: 100px;"
v-model="flinkVersion"
:disabled="isDetails">
<x-option
v-for="version in flinkVersionList"
:key="version.code"
:value="version.code"
:label="version.code">
</x-option>
</x-select>
</div>
</m-list-box>
<div class="list-box-4p">
<div class="clearfix list">
<span class="sp1">{{$t('slot')}}</span>
<span class="sp1" style="word-break:break-all">{{$t('jobManagerMemory')}}</span>
<span class="sp2">
<x-input
:disabled="isDetails"
type="input"
v-model="slot"
:placeholder="$t('Please enter driver core number')"
style="width: 200px;"
autocomplete="off">
:disabled="isDetails"
type="input"
v-model="jobManagerMemory"
:placeholder="$t('Please enter the number of Executor')"
style="width: 200px;"
autocomplete="off">
</x-input>
</span>
<span class="sp1 sp3">{{$t('taskManager')}}</span>
<span class="sp1 sp3">{{$t('taskManagerMemory')}}</span>
<span class="sp2">
<x-input
:disabled="isDetails"
type="input"
v-model="taskManager"
:placeholder="$t('Please enter driver memory use')"
style="width: 186px;"
autocomplete="off">
:disabled="isDetails"
type="input"
v-model="taskManagerMemory"
:placeholder="$t('Please enter the Executor memory')"
style="width: 186px;"
autocomplete="off">
</x-input>
</span>
</div>
<div class="clearfix list">
<span class="sp1" style="word-break:break-all">{{$t('jobManagerMemory')}}</span>
<span class="sp1">{{$t('slot')}}</span>
<span class="sp2">
<x-input
:disabled="isDetails"
type="input"
v-model="jobManagerMemory"
:placeholder="$t('Please enter the number of Executor')"
v-model="slot"
:placeholder="$t('Please enter driver core number')"
style="width: 200px;"
autocomplete="off">
</x-input>
</span>
<span class="sp1 sp3">{{$t('taskManagerMemory')}}</span>
<div v-if="flinkVersion !== '>=1.10'">
<span class="sp1 sp3">{{$t('taskManager')}}</span>
<span class="sp2">
<x-input
:disabled="isDetails"
type="input"
v-model="taskManagerMemory"
:placeholder="$t('Please enter the Executor memory')"
v-model="taskManager"
:placeholder="$t('Please enter driver memory use')"
style="width: 186px;"
autocomplete="off">
</x-input>
</span>
</div>
</div>
</div>
<m-list-box>
<div slot="text">{{$t('Command-line parameters')}}</div>
@ -207,6 +224,11 @@
programType: 'SCALA',
// Program type(List)
programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }],
flinkVersion:'<1.10',
// Flink Versions(List)
flinkVersionList: [{ code: '<1.10' }, { code: '>=1.10' }],
normalizer(node) {
return {
label: node.name
@ -324,6 +346,7 @@
return {id: v}
}),
localParams: this.localParams,
flinkVersion: this.flinkVersion,
slot: this.slot,
taskManager: this.taskManager,
jobManagerMemory: this.jobManagerMemory,
@ -485,11 +508,13 @@
this.mainJar = o.params.mainJar.id || ''
}
this.deployMode = o.params.deployMode || ''
this.flinkVersion = o.params.flinkVersion || '<1.10'
this.slot = o.params.slot || 1
this.taskManager = o.params.taskManager || '2'
this.jobManagerMemory = o.params.jobManagerMemory || '1G'
this.taskManagerMemory = o.params.taskManagerMemory || '2G'
this.mainArgs = o.params.mainArgs || ''
this.others = o.params.others
this.programType = o.params.programType || 'SCALA'

1
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -98,6 +98,7 @@ export default {
Script: 'Script',
'Please enter script(required)': 'Please enter script(required)',
'Deploy Mode': 'Deploy Mode',
'Flink Version':'Flink Version',
'Driver core number': 'Driver core number',
'Please enter driver core number': 'Please enter driver core number',
'Driver memory use': 'Driver memory use',

1
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -99,6 +99,7 @@ export default {
Script: '脚本',
'Please enter script(required)': '请输入脚本(必填)',
'Deploy Mode': '部署方式',
'Flink Version': 'Flink版本',
'Driver core number': 'Driver内核数',
'Please enter driver core number': '请输入Driver内核数',
'Driver memory use': 'Driver内存数',

Loading…
Cancel
Save