diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index 10a4460678..7eaa6b7926 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -906,4 +906,18 @@ public final class Constants { * hive conf */ public static final String HIVE_CONF = "hiveconf:"; + + //flink 任务 + public static final String FLINK_YARN_CLUSTER = "yarn-cluster"; + public static final String FLINK_RUN_MODE = "-m"; + public static final String FLINK_YARN_SLOT = "-ys"; + public static final String FLINK_APP_NAME = "-ynm"; + public static final String FLINK_TASK_MANAGE = "-yn"; + + public static final String FLINK_JOB_MANAGE_MEM = "-yjm"; + public static final String FLINK_TASK_MANAGE_MEM = "-ytm"; + public static final String FLINK_detach = "-d"; + public static final String FLINK_MAIN_CLASS = "-c"; + + } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java b/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java index 1d589167e3..7e4fde6a34 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java @@ -29,8 +29,9 @@ public enum TaskType { * 5 SPARK * 6 PYTHON * 7 DEPENDENT + * 8 FLINK */ - SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT; + SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT,FLINK; public static boolean typeIsNormalTask(String typeName) { TaskType taskType = TaskType.valueOf(typeName); diff --git a/escheduler-common/src/main/java/cn/escheduler/common/task/flink/FlinkParameters.java b/escheduler-common/src/main/java/cn/escheduler/common/task/flink/FlinkParameters.java new file mode 100644 index 0000000000..54dfcb7103 --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/task/flink/FlinkParameters.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.task.flink; + +import cn.escheduler.common.enums.ProgramType; +import cn.escheduler.common.process.ResourceInfo; +import cn.escheduler.common.task.AbstractParameters; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * spark parameters + */ +public class FlinkParameters extends AbstractParameters { + + /** + * major jar + */ + private ResourceInfo mainJar; + + /** + * major class + */ + private String mainClass; + + /** + * deploy mode yarn-cluster yarn-client yarn-local + */ + private String deployMode; + + /** + * arguments + */ + private String mainArgs; + + /** + * slot个数 + */ + private int slot; + + /** + *Yarn application的名字 + */ + + private String appName; + + /** + * taskManager 数量 + */ + private int taskManager; + + /** + * jobManagerMemory 内存大小 + */ + private String jobManagerMemory ; + + /** + * taskManagerMemory内存大小 + */ + private String taskManagerMemory; + + /** + * resource list + */ + private List resourceList; + + /** + * The YARN queue to submit to + */ + private String queue; + + /** + * other arguments + */ + private String others; + + /** + * program type + * 0 JAVA,1 SCALA,2 PYTHON + */ + private ProgramType programType; + + public ResourceInfo getMainJar() { + return mainJar; + } + + public void setMainJar(ResourceInfo mainJar) { + this.mainJar = mainJar; + } + + public String getMainClass() { + return mainClass; + } + + public void setMainClass(String mainClass) { + this.mainClass = mainClass; + } + + public String getDeployMode() { + return deployMode; + } + + public void setDeployMode(String deployMode) { + this.deployMode = deployMode; + } + + public String getMainArgs() { + return mainArgs; + } + + public void setMainArgs(String mainArgs) { + this.mainArgs = mainArgs; + } + + public int getSlot() { + return slot; + } + + public void setSlot(int slot) { + this.slot = slot; + } + + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + } + + public int getTaskManager() { + return taskManager; + } + + public void setTaskManager(int taskManager) { + this.taskManager = taskManager; + } + + public String getJobManagerMemory() { + return jobManagerMemory; + } + + public void setJobManagerMemory(String jobManagerMemory) { + this.jobManagerMemory = jobManagerMemory; + } + + public String getTaskManagerMemory() { + return taskManagerMemory; + } + + public void setTaskManagerMemory(String taskManagerMemory) { + this.taskManagerMemory = taskManagerMemory; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public List getResourceList() { + return resourceList; + } + + public void setResourceList(List resourceList) { + this.resourceList = resourceList; + } + + public String getOthers() { + return others; + } + + public void setOthers(String others) { + this.others = others; + } + + public ProgramType getProgramType() { + return programType; + } + + public void setProgramType(ProgramType programType) { + this.programType = programType; + } + + @Override + public boolean checkParameters() { + return mainJar != null && programType != null; + } + + + @Override + public List getResourceFilesList() { + if(resourceList !=null ) { + this.resourceList.add(mainJar); + return resourceList.stream() + .map(p -> p.getRes()).collect(Collectors.toList()); + } + return null; + } + + +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/TaskParametersUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/TaskParametersUtils.java index feff4141da..c8ceeb44a2 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/utils/TaskParametersUtils.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/TaskParametersUtils.java @@ -19,6 +19,7 @@ package cn.escheduler.common.utils; import cn.escheduler.common.enums.TaskType; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.dependent.DependentParameters; +import cn.escheduler.common.task.flink.FlinkParameters; import cn.escheduler.common.task.mr.MapreduceParameters; import cn.escheduler.common.task.procedure.ProcedureParameters; import cn.escheduler.common.task.python.PythonParameters; @@ -63,6 +64,8 @@ public class TaskParametersUtils { return JSONUtils.parseObject(parameter, PythonParameters.class); case DEPENDENT: return JSONUtils.parseObject(parameter, DependentParameters.class); + case FLINK: + return JSONUtils.parseObject(parameter, FlinkParameters.class); default: return null; } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/utils/FlinkArgsUtils.java b/escheduler-server/src/main/java/cn/escheduler/server/utils/FlinkArgsUtils.java new file mode 100644 index 0000000000..308103073d --- /dev/null +++ b/escheduler-server/src/main/java/cn/escheduler/server/utils/FlinkArgsUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.server.utils; + + +import cn.escheduler.common.Constants; +import cn.escheduler.common.enums.ProgramType; +import cn.escheduler.common.task.flink.FlinkParameters; +import org.apache.commons.lang.StringUtils; + +import java.util.ArrayList; +import java.util.List; + + +/** + * spark args utils + */ +public class FlinkArgsUtils { + + /** + * build args + * @param param + * @return + */ + public static List buildArgs(FlinkParameters param) { + List args = new ArrayList<>(); + + args.add(Constants.FLINK_RUN_MODE); //-m + + args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster + + if (param.getSlot() != 0) { + args.add(Constants.FLINK_YARN_SLOT); + args.add(String.format("%d", param.getSlot())); //-ys + } + + if (StringUtils.isNotEmpty(param.getAppName())) { //-ynm + args.add(Constants.FLINK_APP_NAME); + args.add(param.getAppName()); + } + + if (param.getTaskManager() != 0) { //-yn + args.add(Constants.FLINK_TASK_MANAGE); + args.add(String.format("%d", param.getTaskManager())); + } + + if (StringUtils.isNotEmpty(param.getJobManagerMemory())) { + args.add(Constants.FLINK_JOB_MANAGE_MEM); + args.add(param.getJobManagerMemory()); //-yjm + } + + if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) { // -ytm + args.add(Constants.FLINK_TASK_MANAGE_MEM); + args.add(param.getTaskManagerMemory()); + } + args.add(Constants.FLINK_detach); //-d + + + if(param.getProgramType() !=null ){ + if(param.getProgramType()!=ProgramType.PYTHON){ + if (StringUtils.isNotEmpty(param.getMainClass())) { + args.add(Constants.FLINK_MAIN_CLASS); //-c + args.add(param.getMainClass()); //main class + } + } + } + + if (param.getMainJar() != null) { + args.add(param.getMainJar().getRes()); + } + + + // --files --conf --libjar ... + if (StringUtils.isNotEmpty(param.getOthers())) { + String others = param.getOthers(); + if(!others.contains("--queue")){ + if (StringUtils.isNotEmpty(param.getQueue())) { + args.add(Constants.SPARK_QUEUE); + args.add(param.getQueue()); + } + } + args.add(param.getOthers()); + }else if (StringUtils.isNotEmpty(param.getQueue())) { + args.add(Constants.SPARK_QUEUE); + args.add(param.getQueue()); + + } + + if (StringUtils.isNotEmpty(param.getMainArgs())) { + args.add(param.getMainArgs()); + } + + return args; + } + +} diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java index 213f4fd3f9..6472873d8b 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java @@ -22,6 +22,7 @@ import cn.escheduler.common.enums.TaskRecordStatus; import cn.escheduler.common.enums.TaskType; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; +import cn.escheduler.common.task.flink.FlinkParameters; import cn.escheduler.common.task.mr.MapreduceParameters; import cn.escheduler.common.task.procedure.ProcedureParameters; import cn.escheduler.common.task.python.PythonParameters; @@ -178,6 +179,8 @@ public abstract class AbstractTask { case SPARK: paramsClass = SparkParameters.class; break; + case FLINK: + paramsClass = FlinkParameters.class; case PYTHON: paramsClass = PythonParameters.class; break; diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskManager.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskManager.java index e23a29ae08..986a6179c9 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskManager.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskManager.java @@ -19,6 +19,7 @@ package cn.escheduler.server.worker.task; import cn.escheduler.common.enums.TaskType; import cn.escheduler.server.worker.task.dependent.DependentTask; +import cn.escheduler.server.worker.task.flink.FlinkTask; import cn.escheduler.server.worker.task.mr.MapReduceTask; import cn.escheduler.server.worker.task.processdure.ProcedureTask; import cn.escheduler.server.worker.task.python.PythonTask; @@ -55,6 +56,8 @@ public class TaskManager { return new MapReduceTask(props, logger); case SPARK: return new SparkTask(props, logger); + case FLINK: + return new FlinkTask(props, logger); case PYTHON: return new PythonTask(props, logger); case DEPENDENT: diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/flink/FlinkTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/flink/FlinkTask.java new file mode 100644 index 0000000000..bf6f0cc9fb --- /dev/null +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/flink/FlinkTask.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.server.worker.task.flink; + +import cn.escheduler.common.process.Property; +import cn.escheduler.common.task.AbstractParameters; +import cn.escheduler.common.task.flink.FlinkParameters; +import cn.escheduler.common.utils.JSONUtils; +import cn.escheduler.common.utils.ParameterUtils; +import cn.escheduler.dao.model.ProcessInstance; +import cn.escheduler.server.utils.FlinkArgsUtils; +import cn.escheduler.server.utils.ParamUtils; +import cn.escheduler.server.worker.task.AbstractYarnTask; +import cn.escheduler.server.worker.task.TaskProps; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * flink task + */ +public class FlinkTask extends AbstractYarnTask { + + /** + * flink command + */ + private static final String FLINK_COMMAND = "flink"; + private static final String FLINK_RUN = "run"; + + /** + * flink parameters + */ + private FlinkParameters flinkParameters; + + public FlinkTask(TaskProps props, Logger logger) { + super(props, logger); + } + + @Override + public void init() { + + logger.info("flink task params {}", taskProps.getTaskParams()); + + flinkParameters = JSONUtils.parseObject(taskProps.getTaskParams(), FlinkParameters.class); + + if (!flinkParameters.checkParameters()) { + throw new RuntimeException("flink task params is not valid"); + } + flinkParameters.setQueue(taskProps.getQueue()); + + if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) { + String args = flinkParameters.getMainArgs(); + // get process instance by task instance id + ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + + /** + * combining local and global parameters + */ + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + flinkParameters.getLocalParametersMap(), + processInstance.getCmdTypeIfComplement(), + processInstance.getScheduleTime()); + + logger.info("param Map : {}", paramsMap); + if (paramsMap != null ){ + + args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); + logger.info("param args : {}", args); + } + flinkParameters.setMainArgs(args); + } + } + + /** + * create command + * @return + */ + @Override + protected String buildCommand() { + List args = new ArrayList<>(); + + args.add(FLINK_COMMAND); + args.add(FLINK_RUN); + logger.info("flink task args : {}", args); + // other parameters + args.addAll(FlinkArgsUtils.buildArgs(flinkParameters)); + + String command = ParameterUtils + .convertParameterPlaceholders(String.join(" ", args), taskProps.getDefinedParams()); + + logger.info("flink task command : {}", command); + + return command; + } + + @Override + public AbstractParameters getParameters() { + return flinkParameters; + } +} diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/config.js b/escheduler-ui/src/js/conf/home/pages/dag/_source/config.js index 9c1065870f..5305bf7476 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/config.js +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/config.js @@ -260,6 +260,10 @@ let tasksType = { desc: 'SPARK', color: '#E46F13' }, + 'FLINK': { + desc: 'FLINK', + color: '#E46F13' + }, 'MR': { desc: 'MapReduce', color: '#A0A5CC' diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss b/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss index 95592e0754..37d3acaa19 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss @@ -70,6 +70,9 @@ .icos-SPARK { background: url("../img/toolbar_SPARK.png") no-repeat 50% 50%; } + .icos-FLINK { + background: url("../img/toobar_flink.svg") no-repeat 50% 50%; + } .icos-MR { background: url("../img/toolbar_MR.png") no-repeat 50% 50%; } diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue index 863a44abf5..a46b894d11 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue @@ -19,13 +19,13 @@
@@ -52,13 +52,13 @@
@@ -96,68 +96,74 @@ + ref="timeout" + :backfill-item="backfillItem" + @on-timeout="_onTimeout"> + v-if="taskType === 'SHELL'" + @on-params="_onParams" + ref="SHELL" + :backfill-item="backfillItem"> + v-if="taskType === 'SUB_PROCESS'" + @on-params="_onParams" + @on-set-process-name="_onSetProcessName" + ref="SUB_PROCESS" + :backfill-item="backfillItem"> + v-if="taskType === 'PROCEDURE'" + @on-params="_onParams" + ref="PROCEDURE" + :backfill-item="backfillItem"> + v-if="taskType === 'SQL'" + @on-params="_onParams" + ref="SQL" + :create-node-id="id" + :backfill-item="backfillItem"> + v-if="taskType === 'SPARK'" + @on-params="_onParams" + ref="SPARK" + :backfill-item="backfillItem"> + + + v-if="taskType === 'MR'" + @on-params="_onParams" + ref="MR" + :backfill-item="backfillItem"> + v-if="taskType === 'PYTHON'" + @on-params="_onParams" + ref="PYTHON" + :backfill-item="backfillItem"> + v-if="taskType === 'DEPENDENT'" + @on-dependent="_onDependent" + ref="DEPENDENT" + :backfill-item="backfillItem"> @@ -178,6 +184,7 @@ import i18n from '@/module/i18n' import mShell from './tasks/shell' import mSpark from './tasks/spark' + import mFlink from './tasks/flink' import mPython from './tasks/python' import JSP from './../plugIn/jsPlumbHandle' import mProcedure from './tasks/procedure' @@ -284,12 +291,12 @@ } this.store.dispatch('dag/getSubProcessId', { taskId: stateId }).then(res => { this.$emit('onSubProcess', { - subProcessId: res.data.subProcessInstanceId, - fromThis: this - }) - }).catch(e => { - this.$message.error(e.msg || '') + subProcessId: res.data.subProcessInstanceId, + fromThis: this }) + }).catch(e => { + this.$message.error(e.msg || '') + }) } else { this.$emit('onSubProcess', { subProcessId: this.backfillItem.params.processDefinitionId, @@ -413,10 +420,10 @@ if (taskList.length) { taskList.forEach(v => { if (v.id === this.id) { - o = v - this.backfillItem = v - } - }) + o = v + this.backfillItem = v + } + }) // Non-null objects represent backfill if (!_.isEmpty(o)) { this.name = o.name @@ -455,6 +462,7 @@ mSql, mLog, mSpark, + mFlink, mPython, mDependent, mSelectInput, diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue new file mode 100644 index 0000000000..1c3b18a64c --- /dev/null +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue @@ -0,0 +1,388 @@ + + + + diff --git a/escheduler-ui/src/js/conf/home/pages/dag/img/toobar_flink.svg b/escheduler-ui/src/js/conf/home/pages/dag/img/toobar_flink.svg new file mode 100644 index 0000000000..33ba8b7b3d --- /dev/null +++ b/escheduler-ui/src/js/conf/home/pages/dag/img/toobar_flink.svg @@ -0,0 +1,211 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/escheduler-ui/src/js/module/i18n/locale/zh_CN.js b/escheduler-ui/src/js/module/i18n/locale/zh_CN.js index 7ac3dda87e..cfa0224185 100644 --- a/escheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/escheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -476,5 +476,9 @@ export default { 'warning of timeout': '超时告警', 'Next five execution times': '接下来五次执行时间', 'Execute time': '执行时间', - 'Complement range': '补数范围' + 'Complement range': '补数范围', + 'slot':'slot数量', + 'taskManager':'taskManage数量', + 'jobManagerMemory':'jobManager内存数', + 'taskManagerMemory':'taskManager内存数' } diff --git a/escheduler-ui/src/view/docs/zh_CN/_book/images/flink_edit.png b/escheduler-ui/src/view/docs/zh_CN/_book/images/flink_edit.png new file mode 100644 index 0000000000..b7c2321157 Binary files /dev/null and b/escheduler-ui/src/view/docs/zh_CN/_book/images/flink_edit.png differ diff --git a/script/env/.escheduler_env.sh b/script/env/.escheduler_env.sh index 5a08343c84..e1975816d9 100644 --- a/script/env/.escheduler_env.sh +++ b/script/env/.escheduler_env.sh @@ -5,5 +5,5 @@ export SPARK_HOME2=/opt/soft/spark2 export PYTHON_HOME=/opt/soft/python export JAVA_HOME=/opt/soft/java export HIVE_HOME=/opt/soft/hive - -export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH \ No newline at end of file +export FLINK_HOME=/opt/soft/flink +export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$PATH