diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 0297ae6534..4c43c0cd08 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -842,7 +842,9 @@ public final class Constants { */ public static final String HIVE_CONF = "hiveconf:"; - //flink ?? + /** + * flink + */ public static final String FLINK_YARN_CLUSTER = "yarn-cluster"; public static final String FLINK_RUN_MODE = "-m"; public static final String FLINK_YARN_SLOT = "-ys"; @@ -852,8 +854,9 @@ public final class Constants { public static final String FLINK_JOB_MANAGE_MEM = "-yjm"; public static final String FLINK_TASK_MANAGE_MEM = "-ytm"; - public static final String FLINK_DETACH = "-d"; public static final String FLINK_MAIN_CLASS = "-c"; + public static final String FLINK_PARALLELISM = "-p"; + public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae"; public static final int[] NOT_TERMINATED_STATES = new int[] { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java index 1b1f0a6c5d..76d68c0460 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java @@ -14,12 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.task.flink; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; import java.util.ArrayList; import java.util.List; @@ -29,201 +29,213 @@ import java.util.List; */ public class FlinkParameters extends AbstractParameters { - /** - * major jar - */ - private ResourceInfo mainJar; - - /** - * major class - */ - private String mainClass; - - /** - * deploy mode yarn-cluster yarn-client yarn-local - */ - private String deployMode; - - /** - * arguments - */ - private String mainArgs; - - /** - * slot count - */ - private int slot; - - /** - *Yarn application name - */ - - private String appName; - - /** - * taskManager count - */ - private int taskManager; - - /** - * job manager memory - */ - private String jobManagerMemory ; - - /** - * task manager memory - */ - private String taskManagerMemory; - - /** - * resource list - */ - private List resourceList = new ArrayList<>(); - - /** - * The YARN queue to submit to - */ - private String queue; - - /** - * other arguments - */ - private String others; - - /** - * flink version - */ - private String flinkVersion; - - /** - * program type - * 0 JAVA,1 SCALA,2 PYTHON - */ - private ProgramType programType; - - public ResourceInfo getMainJar() { - return mainJar; - } - - public void setMainJar(ResourceInfo mainJar) { - this.mainJar = mainJar; - } - - public String getMainClass() { - return mainClass; - } - - public void setMainClass(String mainClass) { - this.mainClass = mainClass; - } - - public String getDeployMode() { - return deployMode; - } - - public void setDeployMode(String deployMode) { - this.deployMode = deployMode; - } - - public String getMainArgs() { - return mainArgs; - } - - public void setMainArgs(String mainArgs) { - this.mainArgs = mainArgs; - } - - public int getSlot() { - return slot; - } - - public void setSlot(int slot) { - this.slot = slot; - } - - public String getAppName() { - return appName; - } - - public void setAppName(String appName) { - this.appName = appName; - } - - public int getTaskManager() { - return taskManager; - } - - public void setTaskManager(int taskManager) { - this.taskManager = taskManager; - } - - public String getJobManagerMemory() { - return jobManagerMemory; - } - - public void setJobManagerMemory(String jobManagerMemory) { - this.jobManagerMemory = jobManagerMemory; - } - - public String getTaskManagerMemory() { - return taskManagerMemory; - } - - public void setTaskManagerMemory(String taskManagerMemory) { - this.taskManagerMemory = taskManagerMemory; - } - - public String getQueue() { - return queue; - } - - public void setQueue(String queue) { - this.queue = queue; - } - - public List getResourceList() { - return resourceList; - } + /** + * major jar + */ + private ResourceInfo mainJar; + + /** + * major class + */ + private String mainClass; + + /** + * deploy mode yarn-cluster yarn-local + */ + private String deployMode; + + /** + * arguments + */ + private String mainArgs; + + /** + * slot count + */ + private int slot; + + /** + * parallelism + */ + private int parallelism; + + /** + * yarn application name + */ + private String appName; + + /** + * taskManager count + */ + private int taskManager; + + /** + * job manager memory + */ + private String jobManagerMemory; + + /** + * task manager memory + */ + private String taskManagerMemory; + + /** + * resource list + */ + private List resourceList = new ArrayList<>(); + + /** + * The YARN queue to submit to + */ + private String queue; + + /** + * other arguments + */ + private String others; + + /** + * flink version + */ + private String flinkVersion; + + /** + * program type + * 0 JAVA,1 SCALA,2 PYTHON + */ + private ProgramType programType; + + public ResourceInfo getMainJar() { + return mainJar; + } + + public void setMainJar(ResourceInfo mainJar) { + this.mainJar = mainJar; + } + + public String getMainClass() { + return mainClass; + } + + public void setMainClass(String mainClass) { + this.mainClass = mainClass; + } + + public String getDeployMode() { + return deployMode; + } + + public void setDeployMode(String deployMode) { + this.deployMode = deployMode; + } + + public String getMainArgs() { + return mainArgs; + } + + public void setMainArgs(String mainArgs) { + this.mainArgs = mainArgs; + } + + public int getSlot() { + return slot; + } + + public void setSlot(int slot) { + this.slot = slot; + } + + public int getParallelism() { + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + } + + public int getTaskManager() { + return taskManager; + } - public void setResourceList(List resourceList) { - this.resourceList = resourceList; - } + public void setTaskManager(int taskManager) { + this.taskManager = taskManager; + } - public String getOthers() { - return others; - } + public String getJobManagerMemory() { + return jobManagerMemory; + } + + public void setJobManagerMemory(String jobManagerMemory) { + this.jobManagerMemory = jobManagerMemory; + } - public void setOthers(String others) { - this.others = others; - } + public String getTaskManagerMemory() { + return taskManagerMemory; + } - public ProgramType getProgramType() { - return programType; - } + public void setTaskManagerMemory(String taskManagerMemory) { + this.taskManagerMemory = taskManagerMemory; + } - public void setProgramType(ProgramType programType) { - this.programType = programType; - } + public String getQueue() { + return queue; + } - public String getFlinkVersion() { - return flinkVersion; - } + public void setQueue(String queue) { + this.queue = queue; + } - public void setFlinkVersion(String flinkVersion) { - this.flinkVersion = flinkVersion; - } + public List getResourceList() { + return resourceList; + } - @Override - public boolean checkParameters() { - return mainJar != null && programType != null; - } + public void setResourceList(List resourceList) { + this.resourceList = resourceList; + } + public String getOthers() { + return others; + } - @Override - public List getResourceFilesList() { - if (mainJar != null && !resourceList.contains(mainJar)) { - resourceList.add(mainJar); + public void setOthers(String others) { + this.others = others; } - return resourceList; - } + + public ProgramType getProgramType() { + return programType; + } + + public void setProgramType(ProgramType programType) { + this.programType = programType; + } + + public String getFlinkVersion() { + return flinkVersion; + } + + public void setFlinkVersion(String flinkVersion) { + this.flinkVersion = flinkVersion; + } + + @Override + public boolean checkParameters() { + return mainJar != null && programType != null; + } + + @Override + public List getResourceFilesList() { + if (mainJar != null && !resourceList.contains(mainJar)) { + resourceList.add(mainJar); + } + return resourceList; + } + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java index 519ddf205c..2d5198c227 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java @@ -17,11 +17,11 @@ package org.apache.dolphinscheduler.server.utils; -import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; +import org.apache.dolphinscheduler.common.utils.StringUtils; import java.util.ArrayList; import java.util.List; @@ -53,7 +53,7 @@ public class FlinkArgsUtils { args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster int slot = param.getSlot(); - if (slot != 0) { + if (slot > 0) { args.add(Constants.FLINK_YARN_SLOT); args.add(String.format("%d", slot)); //-ys } @@ -68,7 +68,7 @@ public class FlinkArgsUtils { String flinkVersion = param.getFlinkVersion(); if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) { int taskManager = param.getTaskManager(); - if (taskManager != 0) { //-yn + if (taskManager > 0) { //-yn args.add(Constants.FLINK_TASK_MANAGE); args.add(String.format("%d", taskManager)); } @@ -92,12 +92,19 @@ public class FlinkArgsUtils { args.add(queue); } } + } - args.add(Constants.FLINK_DETACH); //-d - + int parallelism = param.getParallelism(); + if (parallelism > 0) { + args.add(Constants.FLINK_PARALLELISM); + args.add(String.format("%d", parallelism)); // -p } - // -p -s -yqu -yat -sae -yD -D + // If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly + // The task status will be synchronized with the cluster job status + args.add(Constants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae + + // -s -yqu -yat -yD -D if (StringUtils.isNotEmpty(others)) { args.add(others); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java index bea6775e5c..f03062835e 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java @@ -17,19 +17,20 @@ package org.apache.dolphinscheduler.server.utils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; + +import java.util.List; + import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; - /** * Test FlinkArgsUtils */ @@ -39,6 +40,7 @@ public class FlinkArgsUtilsTest { public String mode = "cluster"; public int slot = 2; + public int parallelism = 3; public String appName = "testFlink"; public int taskManager = 4; public String taskManagerMemory = "2G"; @@ -48,7 +50,7 @@ public class FlinkArgsUtilsTest { public ResourceInfo mainJar = null; public String mainArgs = "testArgs --input file:///home"; public String queue = "queue1"; - public String others = "-p 4"; + public String others = "-s hdfs:///flink/savepoint-1537"; public String flinkVersion = "<1.10"; @@ -72,6 +74,7 @@ public class FlinkArgsUtilsTest { param.setMainClass(mainClass); param.setAppName(appName); param.setSlot(slot); + param.setParallelism(parallelism); param.setTaskManager(taskManager); param.setJobManagerMemory(jobManagerMemory); param.setTaskManagerMemory(taskManagerMemory); @@ -89,7 +92,7 @@ public class FlinkArgsUtilsTest { } //Expected values and order - assertEquals(20, result.size()); + assertEquals(22, result.size()); assertEquals("-m", result.get(0)); assertEquals("yarn-cluster", result.get(1)); @@ -112,15 +115,18 @@ public class FlinkArgsUtilsTest { assertEquals("-yqu", result.get(12)); assertEquals(result.get(13),queue); - assertEquals("-d", result.get(14)); + assertEquals("-p", result.get(14)); + assertSame(Integer.valueOf(result.get(15)),parallelism); + + assertEquals("-sae", result.get(16)); - assertEquals(result.get(15),others); + assertEquals(result.get(17),others); - assertEquals("-c", result.get(16)); - assertEquals(result.get(17),mainClass); + assertEquals("-c", result.get(18)); + assertEquals(result.get(19),mainClass); - assertEquals(result.get(18),mainJar.getRes()); - assertEquals(result.get(19),mainArgs); + assertEquals(result.get(20),mainJar.getRes()); + assertEquals(result.get(21),mainArgs); //Others param without -yqu FlinkParameters param1 = new FlinkParameters(); diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue index d0dfdf6d97..d328c5f8b2 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue @@ -80,7 +80,7 @@ - +
{{$t('App Name')}}
-
+
{{$t('JobManager Memory')}}
@@ -136,6 +136,18 @@
+ +
{{$t('Parallelism')}}
+
+ + +
+
{{$t('Main Arguments')}}
@@ -215,6 +227,8 @@ localParams: [], // Slot number slot: 1, + // Parallelism + parallelism: 1, // TaskManager mumber taskManager: '2', // JobManager memory @@ -320,6 +334,11 @@ return false } + if (!Number.isInteger(parseInt(this.parallelism))) { + this.$message.warning(`${i18n.$t('Please enter Parallelism')}`) + return false + } + if (this.flinkVersion === '<1.10' && !Number.isInteger(parseInt(this.taskManager))) { this.$message.warning(`${i18n.$t('Please enter TaskManager number')}`) return false @@ -349,6 +368,7 @@ localParams: this.localParams, flinkVersion: this.flinkVersion, slot: this.slot, + parallelism: this.parallelism, taskManager: this.taskManager, jobManagerMemory: this.jobManagerMemory, taskManagerMemory: this.taskManagerMemory, @@ -485,6 +505,7 @@ resourceList: this.resourceIdArr, localParams: this.localParams, slot: this.slot, + parallelism: this.parallelism, taskManager: this.taskManager, jobManagerMemory: this.jobManagerMemory, taskManagerMemory: this.taskManagerMemory, @@ -516,6 +537,7 @@ this.deployMode = o.params.deployMode || '' this.flinkVersion = o.params.flinkVersion || '<1.10' this.slot = o.params.slot || 1 + this.parallelism = o.params.parallelism || 1 this.taskManager = o.params.taskManager || '2' this.jobManagerMemory = o.params.jobManagerMemory || '1G' this.taskManagerMemory = o.params.taskManagerMemory || '2G' diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index cf90cad00b..3c16553d64 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -123,6 +123,8 @@ export default { 'Please enter TaskManager memory': 'Please enter TaskManager memory', 'Slot Number': 'Slot Number', 'Please enter Slot number': 'Please enter Slot number', + Parallelism: 'Parallelism', + 'Please enter Parallelism': 'Please enter Parallelism', 'TaskManager Number': 'TaskManager Number', 'Please enter TaskManager number': 'Please enter TaskManager number', 'App Name': 'App Name', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index c7e87de125..22aef018f4 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -123,6 +123,8 @@ export default { 'Please enter TaskManager memory': '请输入TaskManager内存数', 'Slot Number': 'Slot数量', 'Please enter Slot number': '请输入Slot数量', + Parallelism: '并行度', + 'Please enter Parallelism': '请输入并行度', 'TaskManager Number': 'TaskManager数量', 'Please enter TaskManager number': '请输入TaskManager数量', 'App Name': '任务名称',