Browse Source

[1.3.6-prepare][Feature][Flink] Support name and parallelism input #4285 #4937 (#4976)

Shiwen Cheng 4 years ago committed by GitHub
parent
commit
505da21fc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 27
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
  3. 20
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
  4. 32
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
  5. 43
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
  6. 4
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  7. 4
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -828,8 +828,9 @@ public final class Constants {
public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
public static final String FLINK_TASK_MANAGE_MEM = "-ytm";
public static final String FLINK_DETACH = "-d";
public static final String FLINK_MAIN_CLASS = "-c";
public static final String FLINK_PARALLELISM = "-p";
public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae";
public static final int[] NOT_TERMINATED_STATES = new int[]{

27
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.flink;
import org.apache.dolphinscheduler.common.enums.ProgramType;
@ -39,8 +40,8 @@ public class FlinkParameters extends AbstractParameters {
private String mainClass;
/**
* deploy mode yarn-cluster yarn-client yarn-local
*/
* deploy mode yarn-cluster yarn-local
*/
private String deployMode;
/**
@ -54,25 +55,29 @@ public class FlinkParameters extends AbstractParameters {
private int slot;
/**
*Yarn application name
* parallelism
*/
private int parallelism;
/**
* yarn application name
*/
private String appName;
/**
* taskManager count
*/
private int taskManager;
private int taskManager;
/**
* job manager memory
*/
private String jobManagerMemory ;
private String jobManagerMemory;
/**
* task manager memory
*/
private String taskManagerMemory;
private String taskManagerMemory;
/**
* resource list
@ -140,6 +145,14 @@ public class FlinkParameters extends AbstractParameters {
this.slot = slot;
}
public int getParallelism() {
return parallelism;
}
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
public String getAppName() {
return appName;
}
@ -217,7 +230,6 @@ public class FlinkParameters extends AbstractParameters {
return mainJar != null && programType != null;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
@ -225,4 +237,5 @@ public class FlinkParameters extends AbstractParameters {
}
return resourceList;
}
}

20
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

@ -17,11 +17,11 @@
package org.apache.dolphinscheduler.server.utils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
@ -53,7 +53,7 @@ public class FlinkArgsUtils {
args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster
int slot = param.getSlot();
if (slot != 0) {
if (slot > 0) {
args.add(Constants.FLINK_YARN_SLOT);
args.add(String.format("%d", slot)); //-ys
}
@ -68,7 +68,7 @@ public class FlinkArgsUtils {
String flinkVersion = param.getFlinkVersion();
if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = param.getTaskManager();
if (taskManager != 0) { //-yn
if (taskManager > 0) { //-yn
args.add(Constants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
@ -92,12 +92,19 @@ public class FlinkArgsUtils {
args.add(queue);
}
}
}
args.add(Constants.FLINK_DETACH); //-d
int parallelism = param.getParallelism();
if (parallelism > 0) {
args.add(Constants.FLINK_PARALLELISM);
args.add(String.format("%d", parallelism)); // -p
}
// -p -s -yqu -yat -sae -yD -D
// If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly
// The task status will be synchronized with the cluster job status
args.add(Constants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
// -s -yqu -yat -yD -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
@ -122,5 +129,4 @@ public class FlinkArgsUtils {
return args;
}
}

32
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java

@ -17,19 +17,20 @@
package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
/**
* Test FlinkArgsUtils
*/
@ -39,6 +40,7 @@ public class FlinkArgsUtilsTest {
public String mode = "cluster";
public int slot = 2;
public int parallelism = 3;
public String appName = "testFlink";
public int taskManager = 4;
public String taskManagerMemory = "2G";
@ -48,7 +50,7 @@ public class FlinkArgsUtilsTest {
public ResourceInfo mainJar = null;
public String mainArgs = "testArgs --input file:///home";
public String queue = "queue1";
public String others = "-p 4";
public String others = "-s hdfs:///flink/savepoint-1537";
public String flinkVersion = "<1.10";
@ -72,6 +74,7 @@ public class FlinkArgsUtilsTest {
param.setMainClass(mainClass);
param.setAppName(appName);
param.setSlot(slot);
param.setParallelism(parallelism);
param.setTaskManager(taskManager);
param.setJobManagerMemory(jobManagerMemory);
param.setTaskManagerMemory(taskManagerMemory);
@ -89,7 +92,7 @@ public class FlinkArgsUtilsTest {
}
//Expected values and order
assertEquals(20, result.size());
assertEquals(22, result.size());
assertEquals("-m", result.get(0));
assertEquals("yarn-cluster", result.get(1));
@ -112,15 +115,18 @@ public class FlinkArgsUtilsTest {
assertEquals("-yqu", result.get(12));
assertEquals(result.get(13),queue);
assertEquals("-d", result.get(14));
assertEquals("-p", result.get(14));
assertSame(Integer.valueOf(result.get(15)),parallelism);
assertEquals("-sae", result.get(16));
assertEquals(result.get(15),others);
assertEquals(result.get(17),others);
assertEquals("-c", result.get(16));
assertEquals(result.get(17),mainClass);
assertEquals("-c", result.get(18));
assertEquals(result.get(19),mainClass);
assertEquals(result.get(18),mainJar.getRes());
assertEquals(result.get(19),mainArgs);
assertEquals(result.get(20),mainJar.getRes());
assertEquals(result.get(21),mainArgs);
//Others param without -yqu
FlinkParameters param1 = new FlinkParameters();

43
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue

@ -79,6 +79,18 @@
</x-select>
</div>
</m-list-box>
<m-list-box v-if="deployMode === 'cluster'">
<div slot="text">{{$t('App Name')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="input"
v-model="appName"
:placeholder="$t('Please enter app name(optional)')"
autocomplete="off">
</x-input>
</div>
</m-list-box>
<div class="list-box-4p" v-if="deployMode === 'cluster'">
<div class="clearfix list">
<span class="sp1" style="word-break:break-all">{{$t('JobManager Memory')}}</span>
@ -129,6 +141,21 @@
</span>
</div>
</div>
<div class="list-box-4p">
<div class="clearfix list">
<span class="sp1" style="word-break:break-all">{{$t('Parallelism')}}</span>
<span class="sp2">
<x-input
:disabled="isDetails"
type="input"
v-model="parallelism"
:placeholder="$t('Please enter Parallelism')"
style="width: 200px;"
autocomplete="off">
</x-input>
</span>
</div>
</div>
<m-list-box>
<div slot="text">{{$t('Main Arguments')}}</div>
<div slot="content">
@ -209,12 +236,16 @@
localParams: [],
// Slot number
slot: 1,
// Parallelism
parallelism: 1,
// TaskManager mumber
taskManager: '2',
// JobManager memory
jobManagerMemory: '1G',
// TaskManager memory
taskManagerMemory: '2G',
// Flink app name
appName: '',
// Main arguments
mainArgs: '',
// Option parameters
@ -322,6 +353,11 @@
return false
}
if (!Number.isInteger(parseInt(this.parallelism))) {
this.$message.warning(`${i18n.$t('Please enter Parallelism')}`)
return false
}
if (this.flinkVersion === '<1.10' && !Number.isInteger(parseInt(this.taskManager))) {
this.$message.warning(`${i18n.$t('Please enter TaskManager number')}`)
return false
@ -351,9 +387,11 @@
localParams: this.localParams,
flinkVersion: this.flinkVersion,
slot: this.slot,
parallelism: this.parallelism,
taskManager: this.taskManager,
jobManagerMemory: this.jobManagerMemory,
taskManagerMemory: this.taskManagerMemory,
appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType
@ -481,9 +519,11 @@
localParams: this.localParams,
flinkVersion: this.flinkVersion,
slot: this.slot,
parallelism: this.parallelism,
taskManager: this.taskManager,
jobManagerMemory: this.jobManagerMemory,
taskManagerMemory: this.taskManagerMemory,
appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType
@ -522,10 +562,11 @@
this.deployMode = o.params.deployMode || ''
this.flinkVersion = o.params.flinkVersion || '<1.10'
this.slot = o.params.slot || 1
this.parallelism = o.params.parallelism || 1
this.taskManager = o.params.taskManager || '2'
this.jobManagerMemory = o.params.jobManagerMemory || '1G'
this.taskManagerMemory = o.params.taskManagerMemory || '2G'
this.appName = o.params.appName || ''
this.mainArgs = o.params.mainArgs || ''
this.others = o.params.others
this.programType = o.params.programType || 'SCALA'

4
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -118,8 +118,12 @@ export default {
'Please enter TaskManager memory': 'Please enter TaskManager memory',
'Slot Number': 'Slot Number',
'Please enter Slot number': 'Please enter Slot number',
Parallelism: 'Parallelism',
'Please enter Parallelism': 'Please enter Parallelism',
'TaskManager Number': 'TaskManager Number',
'Please enter TaskManager number': 'Please enter TaskManager number',
'App Name': 'App Name',
'Please enter app name(optional)': 'Please enter app name(optional)',
'SQL Type': 'SQL Type',
Title: 'Title',
'Please enter the title of email': 'Please enter the title of email',

4
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -118,8 +118,12 @@ export default {
'Please enter TaskManager memory': '请输入TaskManager内存数',
'Slot Number': 'Slot数量',
'Please enter Slot number': '请输入Slot数量',
Parallelism: '并行度',
'Please enter Parallelism': '请输入并行度',
'TaskManager Number': 'TaskManager数量',
'Please enter TaskManager number': '请输入TaskManager数量',
'App Name': '任务名称',
'Please enter app name(optional)': '请输入任务名称(选填)',
'SQL Type': 'sql类型',
Title: '主题',
'Please enter the title of email': '请输入邮件主题',

Loading…
Cancel
Save