lidongdai
5 years ago
114 changed files with 3135 additions and 594 deletions
@ -0,0 +1,219 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package cn.escheduler.common.task.flink; |
||||||
|
|
||||||
|
import cn.escheduler.common.enums.ProgramType; |
||||||
|
import cn.escheduler.common.process.ResourceInfo; |
||||||
|
import cn.escheduler.common.task.AbstractParameters; |
||||||
|
|
||||||
|
import java.util.List; |
||||||
|
import java.util.stream.Collectors; |
||||||
|
|
||||||
|
/** |
||||||
|
* spark parameters |
||||||
|
*/ |
||||||
|
public class FlinkParameters extends AbstractParameters { |
||||||
|
|
||||||
|
/** |
||||||
|
* major jar |
||||||
|
*/ |
||||||
|
private ResourceInfo mainJar; |
||||||
|
|
||||||
|
/** |
||||||
|
* major class
|
||||||
|
*/ |
||||||
|
private String mainClass; |
||||||
|
|
||||||
|
/** |
||||||
|
* deploy mode yarn-cluster yarn-client yarn-local |
||||||
|
*/ |
||||||
|
private String deployMode; |
||||||
|
|
||||||
|
/** |
||||||
|
* arguments |
||||||
|
*/ |
||||||
|
private String mainArgs; |
||||||
|
|
||||||
|
/** |
||||||
|
* slot个数 |
||||||
|
*/ |
||||||
|
private int slot; |
||||||
|
|
||||||
|
/** |
||||||
|
*Yarn application的名字 |
||||||
|
*/ |
||||||
|
|
||||||
|
private String appName; |
||||||
|
|
||||||
|
/** |
||||||
|
* taskManager 数量 |
||||||
|
*/ |
||||||
|
private int taskManager; |
||||||
|
|
||||||
|
/** |
||||||
|
* jobManagerMemory 内存大小 |
||||||
|
*/ |
||||||
|
private String jobManagerMemory ; |
||||||
|
|
||||||
|
/** |
||||||
|
* taskManagerMemory内存大小 |
||||||
|
*/ |
||||||
|
private String taskManagerMemory; |
||||||
|
|
||||||
|
/** |
||||||
|
* resource list |
||||||
|
*/ |
||||||
|
private List<ResourceInfo> resourceList; |
||||||
|
|
||||||
|
/** |
||||||
|
* The YARN queue to submit to |
||||||
|
*/ |
||||||
|
private String queue; |
||||||
|
|
||||||
|
/** |
||||||
|
* other arguments |
||||||
|
*/ |
||||||
|
private String others; |
||||||
|
|
||||||
|
/** |
||||||
|
* program type |
||||||
|
* 0 JAVA,1 SCALA,2 PYTHON |
||||||
|
*/ |
||||||
|
private ProgramType programType; |
||||||
|
|
||||||
|
public ResourceInfo getMainJar() { |
||||||
|
return mainJar; |
||||||
|
} |
||||||
|
|
||||||
|
public void setMainJar(ResourceInfo mainJar) { |
||||||
|
this.mainJar = mainJar; |
||||||
|
} |
||||||
|
|
||||||
|
public String getMainClass() { |
||||||
|
return mainClass; |
||||||
|
} |
||||||
|
|
||||||
|
public void setMainClass(String mainClass) { |
||||||
|
this.mainClass = mainClass; |
||||||
|
} |
||||||
|
|
||||||
|
public String getDeployMode() { |
||||||
|
return deployMode; |
||||||
|
} |
||||||
|
|
||||||
|
public void setDeployMode(String deployMode) { |
||||||
|
this.deployMode = deployMode; |
||||||
|
} |
||||||
|
|
||||||
|
public String getMainArgs() { |
||||||
|
return mainArgs; |
||||||
|
} |
||||||
|
|
||||||
|
public void setMainArgs(String mainArgs) { |
||||||
|
this.mainArgs = mainArgs; |
||||||
|
} |
||||||
|
|
||||||
|
public int getSlot() { |
||||||
|
return slot; |
||||||
|
} |
||||||
|
|
||||||
|
public void setSlot(int slot) { |
||||||
|
this.slot = slot; |
||||||
|
} |
||||||
|
|
||||||
|
public String getAppName() { |
||||||
|
return appName; |
||||||
|
} |
||||||
|
|
||||||
|
public void setAppName(String appName) { |
||||||
|
this.appName = appName; |
||||||
|
} |
||||||
|
|
||||||
|
public int getTaskManager() { |
||||||
|
return taskManager; |
||||||
|
} |
||||||
|
|
||||||
|
public void setTaskManager(int taskManager) { |
||||||
|
this.taskManager = taskManager; |
||||||
|
} |
||||||
|
|
||||||
|
public String getJobManagerMemory() { |
||||||
|
return jobManagerMemory; |
||||||
|
} |
||||||
|
|
||||||
|
public void setJobManagerMemory(String jobManagerMemory) { |
||||||
|
this.jobManagerMemory = jobManagerMemory; |
||||||
|
} |
||||||
|
|
||||||
|
public String getTaskManagerMemory() { |
||||||
|
return taskManagerMemory; |
||||||
|
} |
||||||
|
|
||||||
|
public void setTaskManagerMemory(String taskManagerMemory) { |
||||||
|
this.taskManagerMemory = taskManagerMemory; |
||||||
|
} |
||||||
|
|
||||||
|
public String getQueue() { |
||||||
|
return queue; |
||||||
|
} |
||||||
|
|
||||||
|
public void setQueue(String queue) { |
||||||
|
this.queue = queue; |
||||||
|
} |
||||||
|
|
||||||
|
public List<ResourceInfo> getResourceList() { |
||||||
|
return resourceList; |
||||||
|
} |
||||||
|
|
||||||
|
public void setResourceList(List<ResourceInfo> resourceList) { |
||||||
|
this.resourceList = resourceList; |
||||||
|
} |
||||||
|
|
||||||
|
public String getOthers() { |
||||||
|
return others; |
||||||
|
} |
||||||
|
|
||||||
|
public void setOthers(String others) { |
||||||
|
this.others = others; |
||||||
|
} |
||||||
|
|
||||||
|
public ProgramType getProgramType() { |
||||||
|
return programType; |
||||||
|
} |
||||||
|
|
||||||
|
public void setProgramType(ProgramType programType) { |
||||||
|
this.programType = programType; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public boolean checkParameters() { |
||||||
|
return mainJar != null && programType != null; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
@Override |
||||||
|
public List<String> getResourceFilesList() { |
||||||
|
if(resourceList !=null ) { |
||||||
|
this.resourceList.add(mainJar); |
||||||
|
return resourceList.stream() |
||||||
|
.map(p -> p.getRes()).collect(Collectors.toList()); |
||||||
|
} |
||||||
|
return null; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
} |
@ -0,0 +1,104 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package cn.escheduler.dao.upgrade; |
||||||
|
|
||||||
|
import cn.escheduler.common.utils.ConnectionUtils; |
||||||
|
import cn.escheduler.dao.datasource.ConnectionFactory; |
||||||
|
import org.slf4j.Logger; |
||||||
|
import org.slf4j.LoggerFactory; |
||||||
|
|
||||||
|
import java.sql.Connection; |
||||||
|
import java.sql.ResultSet; |
||||||
|
import java.sql.SQLException; |
||||||
|
|
||||||
|
public class MysqlUpgradeDao extends UpgradeDao { |
||||||
|
|
||||||
|
public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class); |
||||||
|
private static final String T_VERSION_NAME = "t_escheduler_version"; |
||||||
|
private static final String rootDir = System.getProperty("user.dir"); |
||||||
|
|
||||||
|
@Override |
||||||
|
protected void init() { |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
private static class MysqlUpgradeDaoHolder { |
||||||
|
private static final MysqlUpgradeDao INSTANCE = new MysqlUpgradeDao(); |
||||||
|
} |
||||||
|
|
||||||
|
private MysqlUpgradeDao() { |
||||||
|
} |
||||||
|
|
||||||
|
public static final MysqlUpgradeDao getInstance() { |
||||||
|
return MysqlUpgradeDaoHolder.INSTANCE; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
/** |
||||||
|
* Determines whether a table exists |
||||||
|
* @param tableName |
||||||
|
* @return |
||||||
|
*/ |
||||||
|
public boolean isExistsTable(String tableName) { |
||||||
|
Connection conn = null; |
||||||
|
try { |
||||||
|
conn = ConnectionFactory.getDataSource().getConnection(); |
||||||
|
ResultSet rs = conn.getMetaData().getTables(null, null, tableName, null); |
||||||
|
if (rs.next()) { |
||||||
|
return true; |
||||||
|
} else { |
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
} catch (SQLException e) { |
||||||
|
logger.error(e.getMessage(),e); |
||||||
|
throw new RuntimeException(e.getMessage(),e); |
||||||
|
} finally { |
||||||
|
ConnectionUtils.releaseResource(null, null, conn); |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Determines whether a field exists in the specified table |
||||||
|
* @param tableName |
||||||
|
* @param columnName |
||||||
|
* @return |
||||||
|
*/ |
||||||
|
public boolean isExistsColumn(String tableName,String columnName) { |
||||||
|
Connection conn = null; |
||||||
|
try { |
||||||
|
conn = ConnectionFactory.getDataSource().getConnection(); |
||||||
|
ResultSet rs = conn.getMetaData().getColumns(null,null,tableName,columnName); |
||||||
|
if (rs.next()) { |
||||||
|
return true; |
||||||
|
} else { |
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
} catch (SQLException e) { |
||||||
|
logger.error(e.getMessage(),e); |
||||||
|
throw new RuntimeException(e.getMessage(),e); |
||||||
|
} finally { |
||||||
|
ConnectionUtils.releaseResource(null, null, conn); |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,132 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package cn.escheduler.dao.upgrade; |
||||||
|
|
||||||
|
import cn.escheduler.common.utils.ConnectionUtils; |
||||||
|
import cn.escheduler.dao.datasource.ConnectionFactory; |
||||||
|
import org.slf4j.Logger; |
||||||
|
import org.slf4j.LoggerFactory; |
||||||
|
|
||||||
|
import java.sql.Connection; |
||||||
|
import java.sql.PreparedStatement; |
||||||
|
import java.sql.ResultSet; |
||||||
|
import java.sql.SQLException; |
||||||
|
|
||||||
|
public class PostgresqlUpgradeDao extends UpgradeDao { |
||||||
|
|
||||||
|
public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class); |
||||||
|
private static final String T_VERSION_NAME = "t_escheduler_version"; |
||||||
|
private static final String rootDir = System.getProperty("user.dir"); |
||||||
|
private static final String schema = getSchema(); |
||||||
|
|
||||||
|
@Override |
||||||
|
protected void init() { |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
private static class PostgresqlUpgradeDaoHolder { |
||||||
|
private static final PostgresqlUpgradeDao INSTANCE = new PostgresqlUpgradeDao(); |
||||||
|
} |
||||||
|
|
||||||
|
private PostgresqlUpgradeDao() { |
||||||
|
} |
||||||
|
|
||||||
|
public static final PostgresqlUpgradeDao getInstance() { |
||||||
|
return PostgresqlUpgradeDaoHolder.INSTANCE; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
@Override |
||||||
|
public void initSchema(String initSqlPath) { |
||||||
|
super.initSchema(initSqlPath); |
||||||
|
} |
||||||
|
|
||||||
|
private static String getSchema(){ |
||||||
|
Connection conn = null; |
||||||
|
PreparedStatement pstmt = null; |
||||||
|
try { |
||||||
|
conn = ConnectionFactory.getDataSource().getConnection(); |
||||||
|
pstmt = conn.prepareStatement("select current_schema()"); |
||||||
|
ResultSet resultSet = pstmt.executeQuery(); |
||||||
|
while (resultSet.next()){ |
||||||
|
if(resultSet.isFirst()){ |
||||||
|
return resultSet.getString(1); |
||||||
|
} |
||||||
|
} |
||||||
|
} catch (SQLException e) { |
||||||
|
logger.error(e.getMessage(),e); |
||||||
|
|
||||||
|
} finally { |
||||||
|
ConnectionUtils.releaseResource(null, null, conn); |
||||||
|
} |
||||||
|
return ""; |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Determines whether a table exists |
||||||
|
* @param tableName |
||||||
|
* @return |
||||||
|
*/ |
||||||
|
public boolean isExistsTable(String tableName) { |
||||||
|
Connection conn = null; |
||||||
|
try { |
||||||
|
conn = ConnectionFactory.getDataSource().getConnection(); |
||||||
|
|
||||||
|
ResultSet rs = conn.getMetaData().getTables(null, schema, tableName, null); |
||||||
|
if (rs.next()) { |
||||||
|
return true; |
||||||
|
} else { |
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
} catch (SQLException e) { |
||||||
|
logger.error(e.getMessage(),e); |
||||||
|
throw new RuntimeException(e.getMessage(),e); |
||||||
|
} finally { |
||||||
|
ConnectionUtils.releaseResource(null, null, conn); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Determines whether a field exists in the specified table |
||||||
|
* @param tableName |
||||||
|
* @param columnName |
||||||
|
* @return |
||||||
|
*/ |
||||||
|
public boolean isExistsColumn(String tableName,String columnName) { |
||||||
|
Connection conn = null; |
||||||
|
try { |
||||||
|
conn = ConnectionFactory.getDataSource().getConnection(); |
||||||
|
ResultSet rs = conn.getMetaData().getColumns(null,schema,tableName,columnName); |
||||||
|
if (rs.next()) { |
||||||
|
return true; |
||||||
|
} else { |
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
} catch (SQLException e) { |
||||||
|
logger.error(e.getMessage(),e); |
||||||
|
throw new RuntimeException(e.getMessage(),e); |
||||||
|
} finally { |
||||||
|
ConnectionUtils.releaseResource(null, null, conn); |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,110 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package cn.escheduler.server.utils; |
||||||
|
|
||||||
|
|
||||||
|
import cn.escheduler.common.Constants; |
||||||
|
import cn.escheduler.common.enums.ProgramType; |
||||||
|
import cn.escheduler.common.task.flink.FlinkParameters; |
||||||
|
import org.apache.commons.lang.StringUtils; |
||||||
|
|
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
|
||||||
|
/** |
||||||
|
* spark args utils |
||||||
|
*/ |
||||||
|
public class FlinkArgsUtils { |
||||||
|
|
||||||
|
/** |
||||||
|
* build args |
||||||
|
* @param param |
||||||
|
* @return |
||||||
|
*/ |
||||||
|
public static List<String> buildArgs(FlinkParameters param) { |
||||||
|
List<String> args = new ArrayList<>(); |
||||||
|
|
||||||
|
args.add(Constants.FLINK_RUN_MODE); //-m
|
||||||
|
|
||||||
|
args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster
|
||||||
|
|
||||||
|
if (param.getSlot() != 0) { |
||||||
|
args.add(Constants.FLINK_YARN_SLOT); |
||||||
|
args.add(String.format("%d", param.getSlot())); //-ys
|
||||||
|
} |
||||||
|
|
||||||
|
if (StringUtils.isNotEmpty(param.getAppName())) { //-ynm
|
||||||
|
args.add(Constants.FLINK_APP_NAME); |
||||||
|
args.add(param.getAppName()); |
||||||
|
} |
||||||
|
|
||||||
|
if (param.getTaskManager() != 0) { //-yn
|
||||||
|
args.add(Constants.FLINK_TASK_MANAGE); |
||||||
|
args.add(String.format("%d", param.getTaskManager())); |
||||||
|
} |
||||||
|
|
||||||
|
if (StringUtils.isNotEmpty(param.getJobManagerMemory())) { |
||||||
|
args.add(Constants.FLINK_JOB_MANAGE_MEM); |
||||||
|
args.add(param.getJobManagerMemory()); //-yjm
|
||||||
|
} |
||||||
|
|
||||||
|
if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) { // -ytm
|
||||||
|
args.add(Constants.FLINK_TASK_MANAGE_MEM); |
||||||
|
args.add(param.getTaskManagerMemory()); |
||||||
|
} |
||||||
|
args.add(Constants.FLINK_detach); //-d
|
||||||
|
|
||||||
|
|
||||||
|
if(param.getProgramType() !=null ){ |
||||||
|
if(param.getProgramType()!=ProgramType.PYTHON){ |
||||||
|
if (StringUtils.isNotEmpty(param.getMainClass())) { |
||||||
|
args.add(Constants.FLINK_MAIN_CLASS); //-c
|
||||||
|
args.add(param.getMainClass()); //main class
|
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
if (param.getMainJar() != null) { |
||||||
|
args.add(param.getMainJar().getRes()); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
// --files --conf --libjar ...
|
||||||
|
if (StringUtils.isNotEmpty(param.getOthers())) { |
||||||
|
String others = param.getOthers(); |
||||||
|
if(!others.contains("--queue")){ |
||||||
|
if (StringUtils.isNotEmpty(param.getQueue())) { |
||||||
|
args.add(Constants.SPARK_QUEUE); |
||||||
|
args.add(param.getQueue()); |
||||||
|
} |
||||||
|
} |
||||||
|
args.add(param.getOthers()); |
||||||
|
}else if (StringUtils.isNotEmpty(param.getQueue())) { |
||||||
|
args.add(Constants.SPARK_QUEUE); |
||||||
|
args.add(param.getQueue()); |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
if (StringUtils.isNotEmpty(param.getMainArgs())) { |
||||||
|
args.add(param.getMainArgs()); |
||||||
|
} |
||||||
|
|
||||||
|
return args; |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,118 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package cn.escheduler.server.worker.task.flink; |
||||||
|
|
||||||
|
import cn.escheduler.common.process.Property; |
||||||
|
import cn.escheduler.common.task.AbstractParameters; |
||||||
|
import cn.escheduler.common.task.flink.FlinkParameters; |
||||||
|
import cn.escheduler.common.utils.JSONUtils; |
||||||
|
import cn.escheduler.common.utils.ParameterUtils; |
||||||
|
import cn.escheduler.dao.model.ProcessInstance; |
||||||
|
import cn.escheduler.server.utils.FlinkArgsUtils; |
||||||
|
import cn.escheduler.server.utils.ParamUtils; |
||||||
|
import cn.escheduler.server.worker.task.AbstractYarnTask; |
||||||
|
import cn.escheduler.server.worker.task.TaskProps; |
||||||
|
import org.apache.commons.lang3.StringUtils; |
||||||
|
import org.slf4j.Logger; |
||||||
|
|
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.List; |
||||||
|
import java.util.Map; |
||||||
|
|
||||||
|
/** |
||||||
|
* flink task |
||||||
|
*/ |
||||||
|
public class FlinkTask extends AbstractYarnTask { |
||||||
|
|
||||||
|
/** |
||||||
|
* flink command |
||||||
|
*/ |
||||||
|
private static final String FLINK_COMMAND = "flink"; |
||||||
|
private static final String FLINK_RUN = "run"; |
||||||
|
|
||||||
|
/** |
||||||
|
* flink parameters |
||||||
|
*/ |
||||||
|
private FlinkParameters flinkParameters; |
||||||
|
|
||||||
|
public FlinkTask(TaskProps props, Logger logger) { |
||||||
|
super(props, logger); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void init() { |
||||||
|
|
||||||
|
logger.info("flink task params {}", taskProps.getTaskParams()); |
||||||
|
|
||||||
|
flinkParameters = JSONUtils.parseObject(taskProps.getTaskParams(), FlinkParameters.class); |
||||||
|
|
||||||
|
if (!flinkParameters.checkParameters()) { |
||||||
|
throw new RuntimeException("flink task params is not valid"); |
||||||
|
} |
||||||
|
flinkParameters.setQueue(taskProps.getQueue()); |
||||||
|
|
||||||
|
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) { |
||||||
|
String args = flinkParameters.getMainArgs(); |
||||||
|
// get process instance by task instance id
|
||||||
|
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); |
||||||
|
|
||||||
|
/** |
||||||
|
* combining local and global parameters |
||||||
|
*/ |
||||||
|
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), |
||||||
|
taskProps.getDefinedParams(), |
||||||
|
flinkParameters.getLocalParametersMap(), |
||||||
|
processInstance.getCmdTypeIfComplement(), |
||||||
|
processInstance.getScheduleTime()); |
||||||
|
|
||||||
|
logger.info("param Map : {}", paramsMap); |
||||||
|
if (paramsMap != null ){ |
||||||
|
|
||||||
|
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); |
||||||
|
logger.info("param args : {}", args); |
||||||
|
} |
||||||
|
flinkParameters.setMainArgs(args); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* create command |
||||||
|
* @return |
||||||
|
*/ |
||||||
|
@Override |
||||||
|
protected String buildCommand() { |
||||||
|
List<String> args = new ArrayList<>(); |
||||||
|
|
||||||
|
args.add(FLINK_COMMAND); |
||||||
|
args.add(FLINK_RUN); |
||||||
|
logger.info("flink task args : {}", args); |
||||||
|
// other parameters
|
||||||
|
args.addAll(FlinkArgsUtils.buildArgs(flinkParameters)); |
||||||
|
|
||||||
|
String command = ParameterUtils |
||||||
|
.convertParameterPlaceholders(String.join(" ", args), taskProps.getDefinedParams()); |
||||||
|
|
||||||
|
logger.info("flink task command : {}", command); |
||||||
|
|
||||||
|
return command; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public AbstractParameters getParameters() { |
||||||
|
return flinkParameters; |
||||||
|
} |
||||||
|
} |
@ -1 +1,4 @@ |
|||||||
logging.config=classpath:master_logback.xml |
logging.config=classpath:master_logback.xml |
||||||
|
|
||||||
|
# server port |
||||||
|
server.port=5566 |
||||||
|
@ -0,0 +1,4 @@ |
|||||||
|
logging.config=classpath:worker_logback.xml |
||||||
|
|
||||||
|
# server port |
||||||
|
server.port=7788 |
@ -0,0 +1,388 @@ |
|||||||
|
<template> |
||||||
|
<div class="flink-model"> |
||||||
|
<m-list-box> |
||||||
|
<div slot="text">{{$t('Program Type')}}</div> |
||||||
|
<div slot="content"> |
||||||
|
<x-select |
||||||
|
style="width: 130px;" |
||||||
|
v-model="programType" |
||||||
|
:disabled="isDetails"> |
||||||
|
<x-option |
||||||
|
v-for="city in programTypeList" |
||||||
|
:key="city.code" |
||||||
|
:value="city.code" |
||||||
|
:label="city.code"> |
||||||
|
</x-option> |
||||||
|
</x-select> |
||||||
|
</div> |
||||||
|
</m-list-box> |
||||||
|
|
||||||
|
<m-list-box v-if="programType !== 'PYTHON'"> |
||||||
|
<div slot="text">{{$t('Main class')}}</div> |
||||||
|
<div slot="content"> |
||||||
|
<x-input |
||||||
|
:disabled="isDetails" |
||||||
|
type="input" |
||||||
|
v-model="mainClass" |
||||||
|
:placeholder="$t('Please enter main class')" |
||||||
|
autocomplete="off"> |
||||||
|
</x-input> |
||||||
|
</div> |
||||||
|
</m-list-box> |
||||||
|
<m-list-box> |
||||||
|
<div slot="text">{{$t('Main jar package')}}</div> |
||||||
|
<div slot="content"> |
||||||
|
<x-select |
||||||
|
style="width: 100%;" |
||||||
|
:placeholder="$t('Please enter main jar package')" |
||||||
|
v-model="mainJar" |
||||||
|
filterable |
||||||
|
:disabled="isDetails"> |
||||||
|
<x-option |
||||||
|
v-for="city in mainJarList" |
||||||
|
:key="city.code" |
||||||
|
:value="city.code" |
||||||
|
:label="city.code"> |
||||||
|
</x-option> |
||||||
|
</x-select> |
||||||
|
</div> |
||||||
|
</m-list-box> |
||||||
|
<m-list-box> |
||||||
|
<div slot="text">{{$t('Deploy Mode')}}</div> |
||||||
|
<div slot="content"> |
||||||
|
<x-radio-group v-model="deployMode"> |
||||||
|
<x-radio :label="'cluster'" :disabled="isDetails"></x-radio> |
||||||
|
</x-radio-group> |
||||||
|
</div> |
||||||
|
</m-list-box> |
||||||
|
<div class="list-box-4p"> |
||||||
|
<div class="clearfix list"> |
||||||
|
<span class="sp1">{{$t('slot')}}</span> |
||||||
|
<span class="sp2"> |
||||||
|
<x-input |
||||||
|
:disabled="isDetails" |
||||||
|
type="input" |
||||||
|
v-model="slot" |
||||||
|
:placeholder="$t('Please enter driver core number')" |
||||||
|
style="width: 200px;" |
||||||
|
autocomplete="off"> |
||||||
|
</x-input> |
||||||
|
</span> |
||||||
|
<span class="sp1 sp3">{{$t('taskManager')}}</span> |
||||||
|
<span class="sp2"> |
||||||
|
<x-input |
||||||
|
:disabled="isDetails" |
||||||
|
type="input" |
||||||
|
v-model="taskManager" |
||||||
|
:placeholder="$t('Please enter driver memory use')" |
||||||
|
style="width: 186px;" |
||||||
|
autocomplete="off"> |
||||||
|
</x-input> |
||||||
|
</span> |
||||||
|
</div> |
||||||
|
<div class="clearfix list"> |
||||||
|
<span class="sp1">{{$t('jobManagerMemory')}}</span> |
||||||
|
<span class="sp2"> |
||||||
|
<x-input |
||||||
|
:disabled="isDetails" |
||||||
|
type="input" |
||||||
|
v-model="jobManagerMemory" |
||||||
|
:placeholder="$t('Please enter the number of Executor')" |
||||||
|
style="width: 200px;" |
||||||
|
autocomplete="off"> |
||||||
|
</x-input> |
||||||
|
</span> |
||||||
|
<span class="sp1 sp3">{{$t('taskManagerMemory')}}</span> |
||||||
|
<span class="sp2"> |
||||||
|
<x-input |
||||||
|
:disabled="isDetails" |
||||||
|
type="input" |
||||||
|
v-model="taskManagerMemory" |
||||||
|
:placeholder="$t('Please enter the Executor memory')" |
||||||
|
style="width: 186px;" |
||||||
|
autocomplete="off"> |
||||||
|
</x-input> |
||||||
|
</span> |
||||||
|
</div> |
||||||
|
|
||||||
|
</div> |
||||||
|
<m-list-box> |
||||||
|
<div slot="text">{{$t('Command-line parameters')}}</div> |
||||||
|
<div slot="content"> |
||||||
|
<x-input |
||||||
|
:autosize="{minRows:2}" |
||||||
|
:disabled="isDetails" |
||||||
|
type="textarea" |
||||||
|
v-model="mainArgs" |
||||||
|
:placeholder="$t('Please enter Command-line parameters')" |
||||||
|
autocomplete="off"> |
||||||
|
</x-input> |
||||||
|
</div> |
||||||
|
</m-list-box> |
||||||
|
<m-list-box> |
||||||
|
<div slot="text">{{$t('Other parameters')}}</div> |
||||||
|
<div slot="content"> |
||||||
|
<x-input |
||||||
|
:disabled="isDetails" |
||||||
|
:autosize="{minRows:2}" |
||||||
|
type="textarea" |
||||||
|
v-model="others" |
||||||
|
:placeholder="$t('Please enter other parameters')"> |
||||||
|
</x-input> |
||||||
|
</div> |
||||||
|
</m-list-box> |
||||||
|
<m-list-box> |
||||||
|
<div slot="text">{{$t('Resources')}}</div> |
||||||
|
<div slot="content"> |
||||||
|
<m-resources |
||||||
|
ref="refResources" |
||||||
|
@on-resourcesData="_onResourcesData" |
||||||
|
:resource-list="resourceList"> |
||||||
|
</m-resources> |
||||||
|
</div> |
||||||
|
</m-list-box> |
||||||
|
<m-list-box> |
||||||
|
<div slot="text">{{$t('Custom Parameters')}}</div> |
||||||
|
<div slot="content"> |
||||||
|
<m-local-params |
||||||
|
ref="refLocalParams" |
||||||
|
@on-local-params="_onLocalParams" |
||||||
|
:udp-list="localParams" |
||||||
|
:hide="false"> |
||||||
|
</m-local-params> |
||||||
|
</div> |
||||||
|
</m-list-box> |
||||||
|
</div> |
||||||
|
</template> |
||||||
|
<script> |
||||||
|
import _ from 'lodash' |
||||||
|
import i18n from '@/module/i18n' |
||||||
|
import mLocalParams from './_source/localParams' |
||||||
|
import mListBox from './_source/listBox' |
||||||
|
import mResources from './_source/resources' |
||||||
|
import disabledState from '@/module/mixin/disabledState' |
||||||
|
|
||||||
|
export default { |
||||||
|
name: 'flink', |
||||||
|
data () { |
||||||
|
return { |
||||||
|
// Main function class |
||||||
|
mainClass: '', |
||||||
|
// Master jar package |
||||||
|
mainJar: null, |
||||||
|
// Master jar package(List) |
||||||
|
mainJarList: [], |
||||||
|
// Deployment method |
||||||
|
deployMode: 'cluster', |
||||||
|
// Resource(list) |
||||||
|
resourceList: [], |
||||||
|
// Custom function |
||||||
|
localParams: [], |
||||||
|
// Driver Number of cores |
||||||
|
slot: 1, |
||||||
|
// Driver Number of memory |
||||||
|
taskManager: '2', |
||||||
|
// Executor Number |
||||||
|
jobManagerMemory: '1G', |
||||||
|
// Executor Number of memory |
||||||
|
taskManagerMemory: '2G', |
||||||
|
// Executor Number of cores |
||||||
|
executorCores: 2, |
||||||
|
// Command line argument |
||||||
|
mainArgs: '', |
||||||
|
// Other parameters |
||||||
|
others: '', |
||||||
|
// Program type |
||||||
|
programType: 'SCALA', |
||||||
|
// Program type(List) |
||||||
|
programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }] |
||||||
|
} |
||||||
|
}, |
||||||
|
props: { |
||||||
|
backfillItem: Object |
||||||
|
}, |
||||||
|
mixins: [disabledState], |
||||||
|
methods: { |
||||||
|
/** |
||||||
|
* return localParams |
||||||
|
*/ |
||||||
|
_onLocalParams (a) { |
||||||
|
this.localParams = a |
||||||
|
}, |
||||||
|
/** |
||||||
|
* return resourceList |
||||||
|
*/ |
||||||
|
_onResourcesData (a) { |
||||||
|
this.resourceList = a |
||||||
|
}, |
||||||
|
/** |
||||||
|
* verification |
||||||
|
*/ |
||||||
|
_verification () { |
||||||
|
if (this.programType !== 'PYTHON' && !this.mainClass) { |
||||||
|
this.$message.warning(`${i18n.$t('Please enter main class')}`) |
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
if (!this.mainJar) { |
||||||
|
this.$message.warning(`${i18n.$t('Please enter main jar package')}`) |
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
if (!this.jobManagerMemory) { |
||||||
|
this.$message.warning(`${i18n.$t('Please enter the number of Executor')}`) |
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
if (!Number.isInteger(parseInt(this.jobManagerMemory))) { |
||||||
|
this.$message.warning(`${i18n.$t('The number of Executors should be a positive integer')}`) |
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
if (!this.taskManagerMemory) { |
||||||
|
this.$message.warning(`${i18n.$t('Please enter the Executor memory')}`) |
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
if (!this.taskManagerMemory) { |
||||||
|
this.$message.warning(`${i18n.$t('Please enter the Executor memory')}`) |
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
if (!_.isNumber(parseInt(this.taskManagerMemory))) { |
||||||
|
this.$message.warning(`${i18n.$t('Memory should be a positive integer')}`) |
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
if (!this.executorCores) { |
||||||
|
this.$message.warning(`${i18n.$t('Please enter ExecutorPlease enter Executor core number')}`) |
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
if (!Number.isInteger(parseInt(this.executorCores))) { |
||||||
|
this.$message.warning(`${i18n.$t('Core number should be positive integer')}`) |
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
if (!this.$refs.refResources._verifResources()) { |
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
// localParams Subcomponent verification |
||||||
|
if (!this.$refs.refLocalParams._verifProp()) { |
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
// storage |
||||||
|
this.$emit('on-params', { |
||||||
|
mainClass: this.mainClass, |
||||||
|
mainJar: { |
||||||
|
res: this.mainJar |
||||||
|
}, |
||||||
|
deployMode: this.deployMode, |
||||||
|
resourceList: this.resourceList, |
||||||
|
localParams: this.localParams, |
||||||
|
slot: this.slot, |
||||||
|
taskManager: this.taskManager, |
||||||
|
jobManagerMemory: this.jobManagerMemory, |
||||||
|
taskManagerMemory: this.taskManagerMemory, |
||||||
|
executorCores: this.executorCores, |
||||||
|
mainArgs: this.mainArgs, |
||||||
|
others: this.others, |
||||||
|
programType: this.programType |
||||||
|
}) |
||||||
|
return true |
||||||
|
}, |
||||||
|
/** |
||||||
|
* get resources list |
||||||
|
*/ |
||||||
|
_getResourcesList () { |
||||||
|
return new Promise((resolve, reject) => { |
||||||
|
let isJar = (alias) => { |
||||||
|
return alias.substring(alias.lastIndexOf('.') + 1, alias.length) !== 'jar' |
||||||
|
} |
||||||
|
this.mainJarList = _.map(_.cloneDeep(this.store.state.dag.resourcesListS), v => { |
||||||
|
return { |
||||||
|
id: v.id, |
||||||
|
code: v.alias, |
||||||
|
disabled: isJar(v.alias) |
||||||
|
} |
||||||
|
}) |
||||||
|
resolve() |
||||||
|
}) |
||||||
|
} |
||||||
|
}, |
||||||
|
watch: { |
||||||
|
// Listening type |
||||||
|
programType (type) { |
||||||
|
if (type === 'PYTHON') { |
||||||
|
this.mainClass = '' |
||||||
|
} |
||||||
|
} |
||||||
|
}, |
||||||
|
created () { |
||||||
|
this._getResourcesList().then(() => { |
||||||
|
let o = this.backfillItem |
||||||
|
|
||||||
|
// Non-null objects represent backfill |
||||||
|
if (!_.isEmpty(o)) { |
||||||
|
this.mainClass = o.params.mainClass || '' |
||||||
|
this.mainJar = o.params.mainJar.res || '' |
||||||
|
this.deployMode = o.params.deployMode || '' |
||||||
|
this.slot = o.params.slot || 1 |
||||||
|
this.taskManager = o.params.taskManager || '2' |
||||||
|
this.jobManagerMemory = o.params.jobManagerMemory || '1G' |
||||||
|
this.taskManagerMemory = o.params.taskManagerMemory || '2G' |
||||||
|
|
||||||
|
this.mainArgs = o.params.mainArgs || '' |
||||||
|
this.others = o.params.others |
||||||
|
this.programType = o.params.programType || 'SCALA' |
||||||
|
|
||||||
|
// backfill resourceList |
||||||
|
let resourceList = o.params.resourceList || [] |
||||||
|
if (resourceList.length) { |
||||||
|
this.resourceList = resourceList |
||||||
|
} |
||||||
|
|
||||||
|
// backfill localParams |
||||||
|
let localParams = o.params.localParams || [] |
||||||
|
if (localParams.length) { |
||||||
|
this.localParams = localParams |
||||||
|
} |
||||||
|
} |
||||||
|
}) |
||||||
|
}, |
||||||
|
mounted () { |
||||||
|
|
||||||
|
}, |
||||||
|
components: { mLocalParams, mListBox, mResources } |
||||||
|
} |
||||||
|
</script> |
||||||
|
|
||||||
|
<style lang="scss" rel="stylesheet/scss"> |
||||||
|
.flink-model { |
||||||
|
.list-box-4p { |
||||||
|
.list { |
||||||
|
margin-bottom: 14px; |
||||||
|
.sp1 { |
||||||
|
float: left; |
||||||
|
width: 112px; |
||||||
|
text-align: right; |
||||||
|
margin-right: 10px; |
||||||
|
font-size: 14px; |
||||||
|
color: #777; |
||||||
|
display: inline-block; |
||||||
|
padding-top: 6px; |
||||||
|
} |
||||||
|
.sp2 { |
||||||
|
float: left; |
||||||
|
margin-right: 4px; |
||||||
|
} |
||||||
|
.sp3 { |
||||||
|
width: 176px; |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
</style> |
After Width: | Height: | Size: 79 KiB |
After Width: | Height: | Size: 121 KiB |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue