From d3bf2ee80abed7444033a617248aaa9315ead7ba Mon Sep 17 00:00:00 2001 From: Shiwen Cheng Date: Mon, 1 Feb 2021 11:08:14 +0800 Subject: [PATCH] [1.3.5-prepare][cherry-pick]#4166 flink args build problem bug and support flink 1.10 or newer (#4611) * [1.3.5-prepare][cherry-pick]#4166 * [1.3.5-prepare][cherry-pick]#4166 fix unit tests --- dolphinscheduler-api/pom.xml | 6 ++ .../dolphinscheduler/common/Constants.java | 11 ++-- .../common/task/flink/FlinkParameters.java | 18 ++++-- .../common/utils/OSUtilsTest.java | 2 +- .../server/utils/ArgsUtils.java | 30 +++++++++ .../server/utils/FlinkArgsUtils.java | 52 ++++++++-------- .../server/worker/task/flink/FlinkTask.java | 4 +- .../server/utils/FlinkArgsUtilsTest.java | 26 ++++---- dolphinscheduler-ui/package.json | 2 + .../dag/_source/formModel/tasks/flink.vue | 61 +++++++++++++------ .../src/js/module/i18n/locale/en_US.js | 5 ++ .../src/js/module/i18n/locale/zh_CN.js | 7 ++- pom.xml | 17 +++--- 13 files changed, 162 insertions(+), 79 deletions(-) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ArgsUtils.java diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml index f0f53aabd8..8abe3831bd 100644 --- a/dolphinscheduler-api/pom.xml +++ b/dolphinscheduler-api/pom.xml @@ -171,6 +171,12 @@ org.apache.hadoop hadoop-client + + + org.slf4j + slf4j-log4j12 + + diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index d67fc5754c..ebe5a60ee5 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -596,12 +596,6 @@ public final class Constants { public static final String SPARK_QUEUE = "--queue"; - /** - * --queue --qu - */ - public static final String FLINK_QUEUE = "--qu"; - - /** * exit code success */ @@ -821,11 +815,14 @@ public final class Constants { */ public static final String HIVE_CONF = "hiveconf:"; - //flink ?? + /** + * flink + */ public static final String FLINK_YARN_CLUSTER = "yarn-cluster"; public static final String FLINK_RUN_MODE = "-m"; public static final String FLINK_YARN_SLOT = "-ys"; public static final String FLINK_APP_NAME = "-ynm"; + public static final String FLINK_QUEUE = "-yqu"; public static final String FLINK_TASK_MANAGE = "-yn"; public static final String FLINK_JOB_MANAGE_MEM = "-yjm"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java index 05cbb1d794..231dd33146 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java @@ -19,13 +19,12 @@ package org.apache.dolphinscheduler.common.task.flink; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; import java.util.ArrayList; import java.util.List; /** - * spark parameters + * flink parameters */ public class FlinkParameters extends AbstractParameters { @@ -90,6 +89,11 @@ public class FlinkParameters extends AbstractParameters { */ private String others; + /** + * flink version + */ + private String flinkVersion; + /** * program type * 0 JAVA,1 SCALA,2 PYTHON @@ -200,6 +204,14 @@ public class FlinkParameters extends AbstractParameters { this.programType = programType; } + public String getFlinkVersion() { + return flinkVersion; + } + + public void setFlinkVersion(String flinkVersion) { + this.flinkVersion = flinkVersion; + } + @Override public boolean checkParameters() { return mainJar != null && programType != null; @@ -213,6 +225,4 @@ public class FlinkParameters extends AbstractParameters { } return resourceList; } - - } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java index 902e3cf320..ac81722b74 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java @@ -48,7 +48,7 @@ public class OSUtilsTest { double memoryUsage = OSUtils.memoryUsage(); Assert.assertTrue(memoryUsage > 0.0f); double cpuUsage = OSUtils.cpuUsage(); - Assert.assertTrue(cpuUsage > 0.0f); + Assert.assertTrue(cpuUsage >= 0.0f); } @Test diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ArgsUtils.java new file mode 100644 index 0000000000..d71eb54f3c --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ArgsUtils.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +public class ArgsUtils { + + private ArgsUtils() throws IllegalStateException { + throw new IllegalStateException("Utility class"); + } + + public static String escape(String arg) { + return arg.replace(" ", "\\ ").replace("\"", "\\\"").replace("'", "\\'"); + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java index 12c7eb2d56..2431eedd16 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java @@ -14,8 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; +package org.apache.dolphinscheduler.server.utils; import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.Constants; @@ -26,12 +26,13 @@ import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import java.util.ArrayList; import java.util.List; - /** - * spark args utils + * flink args utils */ public class FlinkArgsUtils { private static final String LOCAL_DEPLOY_MODE = "local"; + private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10"; + /** * build args * @param param flink parameters @@ -44,8 +45,8 @@ public class FlinkArgsUtils { String tmpDeployMode = param.getDeployMode(); if (StringUtils.isNotEmpty(tmpDeployMode)) { deployMode = tmpDeployMode; - } + String others = param.getOthers(); if (!LOCAL_DEPLOY_MODE.equals(deployMode)) { args.add(Constants.FLINK_RUN_MODE); //-m @@ -60,15 +61,18 @@ public class FlinkArgsUtils { String appName = param.getAppName(); if (StringUtils.isNotEmpty(appName)) { //-ynm args.add(Constants.FLINK_APP_NAME); - args.add(appName); + args.add(ArgsUtils.escape(appName)); } - int taskManager = param.getTaskManager(); - if (taskManager != 0) { //-yn - args.add(Constants.FLINK_TASK_MANAGE); - args.add(String.format("%d", taskManager)); + // judge flink version,from flink1.10,the parameter -yn removed + String flinkVersion = param.getFlinkVersion(); + if (FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) { + int taskManager = param.getTaskManager(); + if (taskManager != 0) { //-yn + args.add(Constants.FLINK_TASK_MANAGE); + args.add(String.format("%d", taskManager)); + } } - String jobManagerMemory = param.getJobManagerMemory(); if (StringUtils.isNotEmpty(jobManagerMemory)) { args.add(Constants.FLINK_JOB_MANAGE_MEM); @@ -81,10 +85,23 @@ public class FlinkArgsUtils { args.add(taskManagerMemory); } + if (StringUtils.isEmpty(others) || !others.contains(Constants.FLINK_QUEUE)) { + String queue = param.getQueue(); + if (StringUtils.isNotEmpty(queue)) { // -yqu + args.add(Constants.FLINK_QUEUE); + args.add(queue); + } + } + args.add(Constants.FLINK_DETACH); //-d } + // -p -s -yqu -yat -sae -yD -D + if (StringUtils.isNotEmpty(others)) { + args.add(others); + } + ProgramType programType = param.getProgramType(); String mainClass = param.getMainClass(); if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { @@ -102,21 +119,6 @@ public class FlinkArgsUtils { args.add(mainArgs); } - // --files --conf --libjar ... - String others = param.getOthers(); - String queue = param.getQueue(); - if (StringUtils.isNotEmpty(others)) { - - if (!others.contains(Constants.FLINK_QUEUE) && StringUtils.isNotEmpty(queue) && !deployMode.equals(LOCAL_DEPLOY_MODE)) { - args.add(Constants.FLINK_QUEUE); - args.add(param.getQueue()); - } - args.add(others); - } else if (StringUtils.isNotEmpty(queue) && !deployMode.equals(LOCAL_DEPLOY_MODE)) { - args.add(Constants.FLINK_QUEUE); - args.add(param.getQueue()); - } - return args; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java index c377d5fa68..9de28e3e27 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java @@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; -import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; @@ -42,6 +42,7 @@ public class FlinkTask extends AbstractYarnTask { /** * flink command + * usage: flink run [OPTIONS] */ private static final String FLINK_COMMAND = "flink"; private static final String FLINK_RUN = "run"; @@ -102,6 +103,7 @@ public class FlinkTask extends AbstractYarnTask { */ @Override protected String buildCommand() { + // flink run [OPTIONS] List args = new ArrayList<>(); args.add(FLINK_COMMAND); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java index 2e4861e2a2..bea6775e5c 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java @@ -46,9 +46,10 @@ public class FlinkArgsUtilsTest { public ProgramType programType = ProgramType.JAVA; public String mainClass = "com.test"; public ResourceInfo mainJar = null; - public String mainArgs = "testArgs"; + public String mainArgs = "testArgs --input file:///home"; public String queue = "queue1"; - public String others = "--input file:///home"; + public String others = "-p 4"; + public String flinkVersion = "<1.10"; @Before @@ -79,6 +80,7 @@ public class FlinkArgsUtilsTest { param.setMainArgs(mainArgs); param.setQueue(queue); param.setOthers(others); + param.setFlinkVersion(flinkVersion); //Invoke buildArgs List result = FlinkArgsUtils.buildArgs(param); @@ -107,20 +109,20 @@ public class FlinkArgsUtilsTest { assertEquals("-ytm", result.get(10)); assertEquals(result.get(11),taskManagerMemory); - assertEquals("-d", result.get(12)); + assertEquals("-yqu", result.get(12)); + assertEquals(result.get(13),queue); - assertEquals("-c", result.get(13)); - assertEquals(result.get(14),mainClass); + assertEquals("-d", result.get(14)); - assertEquals(result.get(15),mainJar.getRes()); - assertEquals(result.get(16),mainArgs); + assertEquals(result.get(15),others); - assertEquals("--qu", result.get(17)); - assertEquals(result.get(18),queue); + assertEquals("-c", result.get(16)); + assertEquals(result.get(17),mainClass); - assertEquals(result.get(19),others); + assertEquals(result.get(18),mainJar.getRes()); + assertEquals(result.get(19),mainArgs); - //Others param without --qu + //Others param without -yqu FlinkParameters param1 = new FlinkParameters(); param1.setQueue(queue); param1.setDeployMode(mode); @@ -128,4 +130,4 @@ public class FlinkArgsUtilsTest { assertEquals(5, result.size()); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-ui/package.json b/dolphinscheduler-ui/package.json index 03ce0d99cb..ad08a14896 100644 --- a/dolphinscheduler-ui/package.json +++ b/dolphinscheduler-ui/package.json @@ -3,6 +3,8 @@ "version": "1.0.0", "description": "A vue.js project", "author": "DolphinScheduler", + "repository": "https://github.com/apache/incubator-dolphinscheduler", + "license": "Apache-2.0", "scripts": { "build": "npm run clean && cross-env NODE_ENV=production webpack --config ./build/webpack.config.prod.js", "dev": "cross-env NODE_ENV=development webpack-dev-server --config ./build/webpack.config.dev.js", diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue index 36549311e4..1dae35b76b 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue @@ -63,56 +63,71 @@ -
+ +
{{$t('Flink Version')}}
+
+ + + + +
+
+
- {{$t('slot')}} + {{$t('jobManagerMemory')}} - + - {{$t('taskManager')}} + {{$t('taskManagerMemory')}} - +
- {{$t('jobManagerMemory')}} + {{$t('slot')}} - + - {{$t('taskManagerMemory')}} - + {{$t('taskManager')}} + - +
-
{{$t('Command-line parameters')}}
@@ -210,6 +225,11 @@ programType: 'SCALA', // Program type(List) programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }], + + flinkVersion: '<1.10', + // Flink Versions(List) + flinkVersionList: [{ code: '<1.10' }, { code: '>=1.10' }], + normalizer(node) { return { label: node.name @@ -337,6 +357,7 @@ return {id: v} }), localParams: this.localParams, + flinkVersion: this.flinkVersion, slot: this.slot, taskManager: this.taskManager, jobManagerMemory: this.jobManagerMemory, @@ -467,6 +488,7 @@ deployMode: this.deployMode, resourceList: resourceIdArr, localParams: this.localParams, + flinkVersion: this.flinkVersion, slot: this.slot, taskManager: this.taskManager, jobManagerMemory: this.jobManagerMemory, @@ -508,6 +530,7 @@ this.mainJar = o.params.mainJar.id || '' } this.deployMode = o.params.deployMode || '' + this.flinkVersion = o.params.flinkVersion || '<1.10' this.slot = o.params.slot || 1 this.taskManager = o.params.taskManager || '2' this.jobManagerMemory = o.params.jobManagerMemory || '1G' diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index e5ac4375d7..452c55b0da 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -97,6 +97,7 @@ export default { Script: 'Script', 'Please enter script(required)': 'Please enter script(required)', 'Deploy Mode': 'Deploy Mode', + 'Flink Version': 'Flink Version', 'Driver core number': 'Driver core number', 'Please enter driver core number': 'Please enter driver core number', 'Driver memory use': 'Driver memory use', @@ -111,6 +112,10 @@ export default { 'Memory should be a positive integer': 'Memory should be a positive integer', 'Please enter ExecutorPlease enter Executor core number': 'Please enter ExecutorPlease enter Executor core number', 'Core number should be positive integer': 'Core number should be positive integer', + 'Please enter jobManager memory': 'Please enter jobManager memory', + 'Please enter the taskManager memory': 'Please enter the taskManager memory', + 'Please enter slot number': 'Please enter slot number', + 'Please enter taskManager number': 'Please enter taskManager number', 'SQL Type': 'SQL Type', Title: 'Title', 'Please enter the title of email': 'Please enter the title of email', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index 7ad7e0a0b0..79a52055d7 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -98,6 +98,7 @@ export default { Script: '脚本', 'Please enter script(required)': '请输入脚本(必填)', 'Deploy Mode': '部署方式', + 'Flink Version': 'Flink版本', 'Driver core number': 'Driver内核数', 'Please enter driver core number': '请输入Driver内核数', 'Driver memory use': 'Driver内存数', @@ -112,6 +113,10 @@ export default { 'Memory should be a positive integer': '内存数为数字', 'Please enter ExecutorPlease enter Executor core number': '请填写Executor内核数', 'Core number should be positive integer': '内核数为正整数', + 'Please enter jobManager memory': '请输入JobManager内存数', + 'Please enter the taskManager memory': '请输入TaskManager内存数', + 'Please enter slot number': '请输入slot数量', + 'Please enter taskManager number': '请输入taskManager数量', 'SQL Type': 'sql类型', Title: '主题', 'Please enter the title of email': '请输入邮件主题', @@ -515,7 +520,7 @@ export default { 'Execute time': '执行时间', 'Complement range': '补数范围', slot: 'slot数量', - taskManager: 'taskManage数量', + taskManager: 'taskManager数量', jobManagerMemory: 'jobManager内存数', taskManagerMemory: 'taskManager内存数', 'Http Url': '请求地址', diff --git a/pom.xml b/pom.xml index 6833fd427c..e240910ecd 100644 --- a/pom.xml +++ b/pom.xml @@ -795,18 +795,17 @@ **/server/log/TaskLogFilterTest.java **/server/log/WorkerLogFilterTest.java **/server/master/consumer/TaskPriorityQueueConsumerTest.java - **/server/master/runner/MasterTaskExecThreadTest.java - **/server/worker/runner/TaskExecuteThreadTest.java - **/server/master/dispatch/executor/NettyExecutorManagerTest.java + **/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java **/server/master/dispatch/host/assign/RandomSelectorTest.java **/server/master/dispatch/host/assign/RoundRobinSelectorTest.java **/server/master/register/MasterRegistryTest.java **/server/master/AlertManagerTest.java **/server/master/MasterCommandTest.java - **/server/master/DependentTaskTest.java + **/server/master/ParamsTest.java **/server/register/ZookeeperNodeManagerTest.java **/server/utils/DataxUtilsTest.java @@ -815,7 +814,8 @@ **/server/utils/ParamUtilsTest.java **/server/utils/ProcessUtilsTest.java **/server/utils/SparkArgsUtilsTest.java - **/server/worker/processor/TaskCallbackServiceTest.java + **/server/worker/registry/WorkerRegistryTest.java **/server/worker/shell/ShellCommandExecutorTest.java **/server/worker/sql/SqlExecutorTest.java @@ -829,7 +829,6 @@ **/service/zk/DefaultEnsembleProviderTest.java **/service/zk/ZKServerTest.java **/service/queue/TaskUpdateQueueTest.java - **/dao/mapper/DataSourceUserMapperTest.java **/dao/mapper/ErrorCommandMapperTest.java **/dao/mapper/ProcessDefinitionMapperTest.java @@ -854,9 +853,9 @@ **/dao/upgrade/UpgradeDaoTest.java **/plugin/model/AlertDataTest.java **/plugin/model/AlertInfoTest.java - **/plugin/utils/PropertyUtilsTest.java + - true +