Browse Source

[1.3.5-prepare][cherry-pick]#4166 flink args build problem bug and support flink 1.10 or newer (#4611)

* [1.3.5-prepare][cherry-pick]#4166

* [1.3.5-prepare][cherry-pick]#4166 fix unit tests
Shiwen Cheng 4 years ago committed by GitHub
parent
commit
d3bf2ee80a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-api/pom.xml
  2. 11
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  3. 18
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
  4. 2
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
  5. 30
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ArgsUtils.java
  6. 52
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
  7. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
  8. 26
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
  9. 2
      dolphinscheduler-ui/package.json
  10. 61
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
  11. 5
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  12. 7
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  13. 17
      pom.xml

6
dolphinscheduler-api/pom.xml

@ -171,6 +171,12 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

11
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -596,12 +596,6 @@ public final class Constants {
public static final String SPARK_QUEUE = "--queue";
/**
* --queue --qu
*/
public static final String FLINK_QUEUE = "--qu";
/**
* exit code success
*/
@ -821,11 +815,14 @@ public final class Constants {
*/
public static final String HIVE_CONF = "hiveconf:";
//flink ??
/**
* flink
*/
public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
public static final String FLINK_RUN_MODE = "-m";
public static final String FLINK_YARN_SLOT = "-ys";
public static final String FLINK_APP_NAME = "-ynm";
public static final String FLINK_QUEUE = "-yqu";
public static final String FLINK_TASK_MANAGE = "-yn";
public static final String FLINK_JOB_MANAGE_MEM = "-yjm";

18
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java

@ -19,13 +19,12 @@ package org.apache.dolphinscheduler.common.task.flink;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
/**
* spark parameters
* flink parameters
*/
public class FlinkParameters extends AbstractParameters {
@ -90,6 +89,11 @@ public class FlinkParameters extends AbstractParameters {
*/
private String others;
/**
* flink version
*/
private String flinkVersion;
/**
* program type
* 0 JAVA,1 SCALA,2 PYTHON
@ -200,6 +204,14 @@ public class FlinkParameters extends AbstractParameters {
this.programType = programType;
}
public String getFlinkVersion() {
return flinkVersion;
}
public void setFlinkVersion(String flinkVersion) {
this.flinkVersion = flinkVersion;
}
@Override
public boolean checkParameters() {
return mainJar != null && programType != null;
@ -213,6 +225,4 @@ public class FlinkParameters extends AbstractParameters {
}
return resourceList;
}
}

2
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java

@ -48,7 +48,7 @@ public class OSUtilsTest {
double memoryUsage = OSUtils.memoryUsage();
Assert.assertTrue(memoryUsage > 0.0f);
double cpuUsage = OSUtils.cpuUsage();
Assert.assertTrue(cpuUsage > 0.0f);
Assert.assertTrue(cpuUsage >= 0.0f);
}
@Test

30
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ArgsUtils.java

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
public class ArgsUtils {
private ArgsUtils() throws IllegalStateException {
throw new IllegalStateException("Utility class");
}
public static String escape(String arg) {
return arg.replace(" ", "\\ ").replace("\"", "\\\"").replace("'", "\\'");
}
}

52
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.server.utils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
@ -26,12 +26,13 @@ import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import java.util.ArrayList;
import java.util.List;
/**
* spark args utils
* flink args utils
*/
public class FlinkArgsUtils {
private static final String LOCAL_DEPLOY_MODE = "local";
private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
/**
* build args
* @param param flink parameters
@ -44,8 +45,8 @@ public class FlinkArgsUtils {
String tmpDeployMode = param.getDeployMode();
if (StringUtils.isNotEmpty(tmpDeployMode)) {
deployMode = tmpDeployMode;
}
String others = param.getOthers();
if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
args.add(Constants.FLINK_RUN_MODE); //-m
@ -60,15 +61,18 @@ public class FlinkArgsUtils {
String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) { //-ynm
args.add(Constants.FLINK_APP_NAME);
args.add(appName);
args.add(ArgsUtils.escape(appName));
}
int taskManager = param.getTaskManager();
if (taskManager != 0) { //-yn
args.add(Constants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
// judge flink version,from flink1.10,the parameter -yn removed
String flinkVersion = param.getFlinkVersion();
if (FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = param.getTaskManager();
if (taskManager != 0) { //-yn
args.add(Constants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
}
String jobManagerMemory = param.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
args.add(Constants.FLINK_JOB_MANAGE_MEM);
@ -81,10 +85,23 @@ public class FlinkArgsUtils {
args.add(taskManagerMemory);
}
if (StringUtils.isEmpty(others) || !others.contains(Constants.FLINK_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) { // -yqu
args.add(Constants.FLINK_QUEUE);
args.add(queue);
}
}
args.add(Constants.FLINK_DETACH); //-d
}
// -p -s -yqu -yat -sae -yD -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
@ -102,21 +119,6 @@ public class FlinkArgsUtils {
args.add(mainArgs);
}
// --files --conf --libjar ...
String others = param.getOthers();
String queue = param.getQueue();
if (StringUtils.isNotEmpty(others)) {
if (!others.contains(Constants.FLINK_QUEUE) && StringUtils.isNotEmpty(queue) && !deployMode.equals(LOCAL_DEPLOY_MODE)) {
args.add(Constants.FLINK_QUEUE);
args.add(param.getQueue());
}
args.add(others);
} else if (StringUtils.isNotEmpty(queue) && !deployMode.equals(LOCAL_DEPLOY_MODE)) {
args.add(Constants.FLINK_QUEUE);
args.add(param.getQueue());
}
return args;
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java

@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
@ -42,6 +42,7 @@ public class FlinkTask extends AbstractYarnTask {
/**
* flink command
* usage: flink run [OPTIONS] <jar-file> <arguments>
*/
private static final String FLINK_COMMAND = "flink";
private static final String FLINK_RUN = "run";
@ -102,6 +103,7 @@ public class FlinkTask extends AbstractYarnTask {
*/
@Override
protected String buildCommand() {
// flink run [OPTIONS] <jar-file> <arguments>
List<String> args = new ArrayList<>();
args.add(FLINK_COMMAND);

26
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java

@ -46,9 +46,10 @@ public class FlinkArgsUtilsTest {
public ProgramType programType = ProgramType.JAVA;
public String mainClass = "com.test";
public ResourceInfo mainJar = null;
public String mainArgs = "testArgs";
public String mainArgs = "testArgs --input file:///home";
public String queue = "queue1";
public String others = "--input file:///home";
public String others = "-p 4";
public String flinkVersion = "<1.10";
@Before
@ -79,6 +80,7 @@ public class FlinkArgsUtilsTest {
param.setMainArgs(mainArgs);
param.setQueue(queue);
param.setOthers(others);
param.setFlinkVersion(flinkVersion);
//Invoke buildArgs
List<String> result = FlinkArgsUtils.buildArgs(param);
@ -107,20 +109,20 @@ public class FlinkArgsUtilsTest {
assertEquals("-ytm", result.get(10));
assertEquals(result.get(11),taskManagerMemory);
assertEquals("-d", result.get(12));
assertEquals("-yqu", result.get(12));
assertEquals(result.get(13),queue);
assertEquals("-c", result.get(13));
assertEquals(result.get(14),mainClass);
assertEquals("-d", result.get(14));
assertEquals(result.get(15),mainJar.getRes());
assertEquals(result.get(16),mainArgs);
assertEquals(result.get(15),others);
assertEquals("--qu", result.get(17));
assertEquals(result.get(18),queue);
assertEquals("-c", result.get(16));
assertEquals(result.get(17),mainClass);
assertEquals(result.get(19),others);
assertEquals(result.get(18),mainJar.getRes());
assertEquals(result.get(19),mainArgs);
//Others param without --qu
//Others param without -yqu
FlinkParameters param1 = new FlinkParameters();
param1.setQueue(queue);
param1.setDeployMode(mode);
@ -128,4 +130,4 @@ public class FlinkArgsUtilsTest {
assertEquals(5, result.size());
}
}
}

2
dolphinscheduler-ui/package.json

@ -3,6 +3,8 @@
"version": "1.0.0",
"description": "A vue.js project",
"author": "DolphinScheduler",
"repository": "https://github.com/apache/incubator-dolphinscheduler",
"license": "Apache-2.0",
"scripts": {
"build": "npm run clean && cross-env NODE_ENV=production webpack --config ./build/webpack.config.prod.js",
"dev": "cross-env NODE_ENV=development webpack-dev-server --config ./build/webpack.config.dev.js",

61
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue

@ -63,56 +63,71 @@
</x-radio-group>
</div>
</m-list-box>
<div class="list-box-4p">
<m-list-box v-if="deployMode === 'cluster'">
<div slot="text">{{$t('Flink Version')}}</div>
<div slot="content">
<x-select
style="width: 100px;"
v-model="flinkVersion"
:disabled="isDetails">
<x-option
v-for="version in flinkVersionList"
:key="version.code"
:value="version.code"
:label="version.code">
</x-option>
</x-select>
</div>
</m-list-box>
<div class="list-box-4p" v-if="deployMode === 'cluster'">
<div class="clearfix list">
<span class="sp1">{{$t('slot')}}</span>
<span class="sp1" style="word-break:break-all">{{$t('jobManagerMemory')}}</span>
<span class="sp2">
<x-input
:disabled="isDetails"
type="input"
v-model="slot"
:placeholder="$t('Please enter driver core number')"
v-model="jobManagerMemory"
:placeholder="$t('Please enter jobManager memory')"
style="width: 200px;"
autocomplete="off">
</x-input>
</x-input>
</span>
<span class="sp1 sp3">{{$t('taskManager')}}</span>
<span class="sp1 sp3">{{$t('taskManagerMemory')}}</span>
<span class="sp2">
<x-input
:disabled="isDetails"
type="input"
v-model="taskManager"
:placeholder="$t('Please enter driver memory use')"
v-model="taskManagerMemory"
:placeholder="$t('Please enter the taskManager memory')"
style="width: 186px;"
autocomplete="off">
</x-input>
</x-input>
</span>
</div>
<div class="clearfix list">
<span class="sp1" style="word-break:break-all">{{$t('jobManagerMemory')}}</span>
<span class="sp1">{{$t('slot')}}</span>
<span class="sp2">
<x-input
:disabled="isDetails"
type="input"
v-model="jobManagerMemory"
:placeholder="$t('Please enter the number of Executor')"
v-model="slot"
:placeholder="$t('Please enter slot number')"
style="width: 200px;"
autocomplete="off">
</x-input>
</x-input>
</span>
<span class="sp1 sp3">{{$t('taskManagerMemory')}}</span>
<span class="sp2">
<span class="sp1 sp3" v-if="flinkVersion === '<1.10'">{{$t('taskManager')}}</span>
<span class="sp2" v-if="flinkVersion === '<1.10'">
<x-input
:disabled="isDetails"
type="input"
v-model="taskManagerMemory"
:placeholder="$t('Please enter the Executor memory')"
v-model="taskManager"
:placeholder="$t('Please enter taskManager number')"
style="width: 186px;"
autocomplete="off">
</x-input>
</x-input>
</span>
</div>
</div>
<m-list-box>
<div slot="text">{{$t('Command-line parameters')}}</div>
@ -210,6 +225,11 @@
programType: 'SCALA',
// Program type(List)
programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }],
flinkVersion: '<1.10',
// Flink Versions(List)
flinkVersionList: [{ code: '<1.10' }, { code: '>=1.10' }],
normalizer(node) {
return {
label: node.name
@ -337,6 +357,7 @@
return {id: v}
}),
localParams: this.localParams,
flinkVersion: this.flinkVersion,
slot: this.slot,
taskManager: this.taskManager,
jobManagerMemory: this.jobManagerMemory,
@ -467,6 +488,7 @@
deployMode: this.deployMode,
resourceList: resourceIdArr,
localParams: this.localParams,
flinkVersion: this.flinkVersion,
slot: this.slot,
taskManager: this.taskManager,
jobManagerMemory: this.jobManagerMemory,
@ -508,6 +530,7 @@
this.mainJar = o.params.mainJar.id || ''
}
this.deployMode = o.params.deployMode || ''
this.flinkVersion = o.params.flinkVersion || '<1.10'
this.slot = o.params.slot || 1
this.taskManager = o.params.taskManager || '2'
this.jobManagerMemory = o.params.jobManagerMemory || '1G'

5
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -97,6 +97,7 @@ export default {
Script: 'Script',
'Please enter script(required)': 'Please enter script(required)',
'Deploy Mode': 'Deploy Mode',
'Flink Version': 'Flink Version',
'Driver core number': 'Driver core number',
'Please enter driver core number': 'Please enter driver core number',
'Driver memory use': 'Driver memory use',
@ -111,6 +112,10 @@ export default {
'Memory should be a positive integer': 'Memory should be a positive integer',
'Please enter ExecutorPlease enter Executor core number': 'Please enter ExecutorPlease enter Executor core number',
'Core number should be positive integer': 'Core number should be positive integer',
'Please enter jobManager memory': 'Please enter jobManager memory',
'Please enter the taskManager memory': 'Please enter the taskManager memory',
'Please enter slot number': 'Please enter slot number',
'Please enter taskManager number': 'Please enter taskManager number',
'SQL Type': 'SQL Type',
Title: 'Title',
'Please enter the title of email': 'Please enter the title of email',

7
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -98,6 +98,7 @@ export default {
Script: '脚本',
'Please enter script(required)': '请输入脚本(必填)',
'Deploy Mode': '部署方式',
'Flink Version': 'Flink版本',
'Driver core number': 'Driver内核数',
'Please enter driver core number': '请输入Driver内核数',
'Driver memory use': 'Driver内存数',
@ -112,6 +113,10 @@ export default {
'Memory should be a positive integer': '内存数为数字',
'Please enter ExecutorPlease enter Executor core number': '请填写Executor内核数',
'Core number should be positive integer': '内核数为正整数',
'Please enter jobManager memory': '请输入JobManager内存数',
'Please enter the taskManager memory': '请输入TaskManager内存数',
'Please enter slot number': '请输入slot数量',
'Please enter taskManager number': '请输入taskManager数量',
'SQL Type': 'sql类型',
Title: '主题',
'Please enter the title of email': '请输入邮件主题',
@ -515,7 +520,7 @@ export default {
'Execute time': '执行时间',
'Complement range': '补数范围',
slot: 'slot数量',
taskManager: 'taskManage数量',
taskManager: 'taskManager数量',
jobManagerMemory: 'jobManager内存数',
taskManagerMemory: 'taskManager内存数',
'Http Url': '请求地址',

17
pom.xml

@ -795,18 +795,17 @@
<include>**/server/log/TaskLogFilterTest.java</include>
<include>**/server/log/WorkerLogFilterTest.java</include>
<include>**/server/master/consumer/TaskPriorityQueueConsumerTest.java</include>
<include>**/server/master/runner/MasterTaskExecThreadTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
<include>**/server/master/dispatch/executor/NettyExecutorManagerTest.java</include>
<!-- <include>**/server/master/runner/MasterTaskExecThreadTest.java</include>
<include>**/server/master/dispatch/executor/NettyExecutorManagerTest.java</include> -->
<include>**/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java</include>
<include>**/server/master/dispatch/host/assign/RandomSelectorTest.java</include>
<include>**/server/master/dispatch/host/assign/RoundRobinSelectorTest.java</include>
<include>**/server/master/register/MasterRegistryTest.java</include>
<include>**/server/master/AlertManagerTest.java</include>
<include>**/server/master/MasterCommandTest.java</include>
<include>**/server/master/DependentTaskTest.java</include>
<!-- <include>**/server/master/DependentTaskTest.java</include>
<include>**/server/master/ConditionsTaskTest.java</include>
<include>**/server/master/MasterExecThreadTest.java</include>
<include>**/server/master/MasterExecThreadTest.java</include> -->
<include>**/server/master/ParamsTest.java</include>
<include>**/server/register/ZookeeperNodeManagerTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
@ -815,7 +814,8 @@
<include>**/server/utils/ParamUtilsTest.java</include>
<include>**/server/utils/ProcessUtilsTest.java</include>
<include>**/server/utils/SparkArgsUtilsTest.java</include>
<include>**/server/worker/processor/TaskCallbackServiceTest.java</include>
<!-- <include>**/server/worker/processor/TaskCallbackServiceTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include> -->
<include>**/server/worker/registry/WorkerRegistryTest.java</include>
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
<include>**/server/worker/sql/SqlExecutorTest.java</include>
@ -829,7 +829,6 @@
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/service/zk/ZKServerTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include>
<include>**/dao/mapper/DataSourceUserMapperTest.java</include>
<include>**/dao/mapper/ErrorCommandMapperTest.java</include>
<include>**/dao/mapper/ProcessDefinitionMapperTest.java</include>
@ -854,9 +853,9 @@
<include>**/dao/upgrade/UpgradeDaoTest.java</include>
<include>**/plugin/model/AlertDataTest.java</include>
<include>**/plugin/model/AlertInfoTest.java</include>
<include>**/plugin/utils/PropertyUtilsTest.java</include>
<!-- <include>**/plugin/utils/PropertyUtilsTest.java</include> -->
</includes>
<skip>true</skip>
<!-- <skip>true</skip> -->
</configuration>
</plugin>

Loading…
Cancel
Save