|
|
|
@ -21,57 +21,70 @@ import cn.escheduler.common.Constants;
|
|
|
|
|
import cn.escheduler.common.enums.ProgramType; |
|
|
|
|
import cn.escheduler.common.task.flink.FlinkParameters; |
|
|
|
|
import org.apache.commons.lang.StringUtils; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
|
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.List; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* spark args utils |
|
|
|
|
* spark args utils |
|
|
|
|
*/ |
|
|
|
|
public class FlinkArgsUtils { |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* build args |
|
|
|
|
* build args |
|
|
|
|
* |
|
|
|
|
* @param param |
|
|
|
|
* @return |
|
|
|
|
*/ |
|
|
|
|
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(FlinkArgsUtils.class); |
|
|
|
|
|
|
|
|
|
public static List<String> buildArgs(FlinkParameters param) { |
|
|
|
|
List<String> args = new ArrayList<>(); |
|
|
|
|
String deployMode = "cluster"; |
|
|
|
|
if (StringUtils.isNotEmpty(param.getDeployMode())) { |
|
|
|
|
deployMode = param.getDeployMode(); |
|
|
|
|
|
|
|
|
|
args.add(Constants.FLINK_RUN_MODE); //-m
|
|
|
|
|
} |
|
|
|
|
if (!"local".equals(deployMode)) { |
|
|
|
|
args.add(Constants.FLINK_RUN_MODE); //-m
|
|
|
|
|
|
|
|
|
|
args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster
|
|
|
|
|
args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster
|
|
|
|
|
|
|
|
|
|
if (param.getSlot() != 0) { |
|
|
|
|
args.add(Constants.FLINK_YARN_SLOT); |
|
|
|
|
args.add(String.format("%d", param.getSlot())); //-ys
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (StringUtils.isNotEmpty(param.getAppName())) { //-ynm
|
|
|
|
|
args.add(Constants.FLINK_APP_NAME); |
|
|
|
|
args.add(param.getAppName()); |
|
|
|
|
} |
|
|
|
|
if (param.getSlot() != 0) { |
|
|
|
|
args.add(Constants.FLINK_YARN_SLOT); |
|
|
|
|
args.add(String.format("%d", param.getSlot())); //-ys
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (param.getTaskManager() != 0) { //-yn
|
|
|
|
|
args.add(Constants.FLINK_TASK_MANAGE); |
|
|
|
|
args.add(String.format("%d", param.getTaskManager())); |
|
|
|
|
} |
|
|
|
|
if (StringUtils.isNotEmpty(param.getAppName())) { //-ynm
|
|
|
|
|
args.add(Constants.FLINK_APP_NAME); |
|
|
|
|
args.add(param.getAppName()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (StringUtils.isNotEmpty(param.getJobManagerMemory())) { |
|
|
|
|
args.add(Constants.FLINK_JOB_MANAGE_MEM); |
|
|
|
|
args.add(param.getJobManagerMemory()); //-yjm
|
|
|
|
|
} |
|
|
|
|
if (param.getTaskManager() != 0) { //-yn
|
|
|
|
|
args.add(Constants.FLINK_TASK_MANAGE); |
|
|
|
|
args.add(String.format("%d", param.getTaskManager())); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) { // -ytm
|
|
|
|
|
args.add(Constants.FLINK_TASK_MANAGE_MEM); |
|
|
|
|
args.add(param.getTaskManagerMemory()); |
|
|
|
|
} |
|
|
|
|
args.add(Constants.FLINK_detach); //-d
|
|
|
|
|
if (StringUtils.isNotEmpty(param.getJobManagerMemory())) { |
|
|
|
|
args.add(Constants.FLINK_JOB_MANAGE_MEM); |
|
|
|
|
args.add(param.getJobManagerMemory()); //-yjm
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) { // -ytm
|
|
|
|
|
args.add(Constants.FLINK_TASK_MANAGE_MEM); |
|
|
|
|
args.add(param.getTaskManagerMemory()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
args.add(Constants.FLINK_detach); //-d
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if(param.getProgramType() !=null ){ |
|
|
|
|
if(param.getProgramType()!=ProgramType.PYTHON){ |
|
|
|
|
if (param.getProgramType() != null) { |
|
|
|
|
if (param.getProgramType() != ProgramType.PYTHON) { |
|
|
|
|
if (StringUtils.isNotEmpty(param.getMainClass())) { |
|
|
|
|
args.add(Constants.FLINK_MAIN_CLASS); //-c
|
|
|
|
|
args.add(param.getMainClass()); //main class
|
|
|
|
@ -83,28 +96,29 @@ public class FlinkArgsUtils {
|
|
|
|
|
args.add(param.getMainJar().getRes()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (StringUtils.isNotEmpty(param.getMainArgs())) { |
|
|
|
|
args.add(param.getMainArgs()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// --files --conf --libjar ...
|
|
|
|
|
if (StringUtils.isNotEmpty(param.getOthers())) { |
|
|
|
|
if (StringUtils.isNotEmpty(param.getOthers())) { |
|
|
|
|
String others = param.getOthers(); |
|
|
|
|
if(!others.contains("--queue")){ |
|
|
|
|
if (StringUtils.isNotEmpty(param.getQueue())) { |
|
|
|
|
args.add(Constants.SPARK_QUEUE); |
|
|
|
|
if (!others.contains("--qu")) { |
|
|
|
|
if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) { |
|
|
|
|
args.add(Constants.FLINK_QUEUE); |
|
|
|
|
args.add(param.getQueue()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
args.add(param.getOthers()); |
|
|
|
|
}else if (StringUtils.isNotEmpty(param.getQueue())) { |
|
|
|
|
args.add(Constants.SPARK_QUEUE); |
|
|
|
|
} else if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) { |
|
|
|
|
args.add(Constants.FLINK_QUEUE); |
|
|
|
|
args.add(param.getQueue()); |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (StringUtils.isNotEmpty(param.getMainArgs())) { |
|
|
|
|
args.add(param.getMainArgs()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return args; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |