Browse Source

[Fix-3457][flink] fix flink args build problem (#4166)

* [Fix][Flink] fix flink args build problem

* [Fix][Flink] fix FlinkArgsUtilsTest

* [Improvement][UI] hide version and cluster input when deployMode is local
pull/3/MERGE
Shiwen Cheng 4 years ago committed by GitHub
parent
commit
cbc30b4900
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/resources/application-api.properties
  2. 7
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  3. 31
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
  4. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
  5. 22
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
  6. 4
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
  7. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  8. 2
      pom.xml

2
dolphinscheduler-api/src/main/resources/application-api.properties

@ -35,7 +35,7 @@ server.compression.enabled=true
server.compression.mime-types=text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml
#post content
server.jetty.max-http-post-size=5000000
server.jetty.max-http-form-post-size=5000000
spring.messages.encoding=UTF-8

7
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -604,12 +604,6 @@ public final class Constants {
public static final String SPARK_QUEUE = "--queue";
/**
* --queue --qu
*/
public static final String FLINK_QUEUE = "--qu";
/**
* exit code success
*/
@ -838,6 +832,7 @@ public final class Constants {
public static final String FLINK_RUN_MODE = "-m";
public static final String FLINK_YARN_SLOT = "-ys";
public static final String FLINK_APP_NAME = "-ynm";
public static final String FLINK_QUEUE = "-yqu";
public static final String FLINK_TASK_MANAGE = "-yn";
public static final String FLINK_JOB_MANAGE_MEM = "-yjm";

31
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

@ -47,6 +47,7 @@ public class FlinkArgsUtils {
if (StringUtils.isNotEmpty(tmpDeployMode)) {
deployMode = tmpDeployMode;
}
String others = param.getOthers();
if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
args.add(Constants.FLINK_RUN_MODE); //-m
@ -64,7 +65,7 @@ public class FlinkArgsUtils {
args.add(appName);
}
// judgy flink version,from flink1.10,the parameter -yn removed
// judge flink version,from flink1.10,the parameter -yn removed
String flinkVersion = param.getFlinkVersion();
if (FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = param.getTaskManager();
@ -85,10 +86,23 @@ public class FlinkArgsUtils {
args.add(taskManagerMemory);
}
if (StringUtils.isEmpty(others) || !others.contains(Constants.FLINK_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) { // -yqu
args.add(Constants.FLINK_QUEUE);
args.add(queue);
}
}
args.add(Constants.FLINK_DETACH); //-d
}
// -p -s -yqu -yat -sae -yD -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
@ -106,21 +120,6 @@ public class FlinkArgsUtils {
args.add(mainArgs);
}
// --files --conf --libjar ...
String others = param.getOthers();
String queue = param.getQueue();
if (StringUtils.isNotEmpty(others)) {
if (!others.contains(Constants.FLINK_QUEUE) && StringUtils.isNotEmpty(queue) && !deployMode.equals(LOCAL_DEPLOY_MODE)) {
args.add(Constants.FLINK_QUEUE);
args.add(param.getQueue());
}
args.add(others);
} else if (StringUtils.isNotEmpty(queue) && !deployMode.equals(LOCAL_DEPLOY_MODE)) {
args.add(Constants.FLINK_QUEUE);
args.add(param.getQueue());
}
return args;
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java

@ -42,6 +42,7 @@ public class FlinkTask extends AbstractYarnTask {
/**
* flink command
* usage: flink run [OPTIONS] <jar-file> <arguments>
*/
private static final String FLINK_COMMAND = "flink";
private static final String FLINK_RUN = "run";
@ -102,6 +103,7 @@ public class FlinkTask extends AbstractYarnTask {
*/
@Override
protected String buildCommand() {
// flink run [OPTIONS] <jar-file> <arguments>
List<String> args = new ArrayList<>();
args.add(FLINK_COMMAND);

22
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java

@ -46,9 +46,9 @@ public class FlinkArgsUtilsTest {
public ProgramType programType = ProgramType.JAVA;
public String mainClass = "com.test";
public ResourceInfo mainJar = null;
public String mainArgs = "testArgs";
public String mainArgs = "testArgs --input file:///home";
public String queue = "queue1";
public String others = "--input file:///home";
public String others = "-p 4";
public String flinkVersion = "<1.10";
@ -109,20 +109,20 @@ public class FlinkArgsUtilsTest {
assertEquals("-ytm", result.get(10));
assertEquals(result.get(11),taskManagerMemory);
assertEquals("-d", result.get(12));
assertEquals("-yqu", result.get(12));
assertEquals(result.get(13),queue);
assertEquals("-c", result.get(13));
assertEquals(result.get(14),mainClass);
assertEquals("-d", result.get(14));
assertEquals(result.get(15),mainJar.getRes());
assertEquals(result.get(16),mainArgs);
assertEquals(result.get(15),others);
assertEquals("--qu", result.get(17));
assertEquals(result.get(18),queue);
assertEquals("-c", result.get(16));
assertEquals(result.get(17),mainClass);
assertEquals(result.get(19),others);
assertEquals(result.get(18),mainJar.getRes());
assertEquals(result.get(19),mainArgs);
//Others param without --qu
//Others param without -yqu
FlinkParameters param1 = new FlinkParameters();
param1.setQueue(queue);
param1.setDeployMode(mode);

4
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue

@ -62,7 +62,7 @@
</x-radio-group>
</div>
</m-list-box>
<m-list-box>
<m-list-box v-if="deployMode === 'cluster'">
<div slot="text">{{$t('Flink Version')}}</div>
<div slot="content">
<x-select
@ -78,7 +78,7 @@
</x-select>
</div>
</m-list-box>
<div class="list-box-4p">
<div class="list-box-4p" v-if="deployMode === 'cluster'">
<div class="clearfix list">
<span class="sp1" style="word-break:break-all">{{$t('jobManagerMemory')}}</span>
<span class="sp2">

2
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -535,7 +535,7 @@ export default {
'Execute time': '执行时间',
'Complement range': '补数范围',
slot: 'slot数量',
taskManager: 'taskManage数量',
taskManager: 'taskManager数量',
jobManagerMemory: 'jobManager内存数',
taskManagerMemory: 'taskManager内存数',
'Http Url': '请求地址',

2
pom.xml

@ -841,7 +841,7 @@
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include>
<include>**/server/utils/HostTest.java</include>
<!--<include>**/server/utils/FlinkArgsUtilsTest.java</include>-->
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
<include>**/server/utils/LogUtilsTest.java</include>
<include>**/server/utils/ParamUtilsTest.java</include>
<include>**/server/utils/ProcessUtilsTest.java</include>

Loading…
Cancel
Save