Browse Source

REPORT-10696 hdfs资源仓库插件上传

release/10.0
rinoux 6 years ago
commit
e16ec06d0a
  1. 120
      build.xml
  2. BIN
      img/db_fields.png
  3. BIN
      img/set.png
  4. BIN
      lib/hdfs-all-2.9.0.jar
  5. 20
      plugin.xml
  6. 22
      readme.md
  7. 80
      src/com/fr/plugin/hdfs/repository/HDFSFactoryProvider.java
  8. 16
      src/com/fr/plugin/hdfs/repository/core/HDFSConfig.java
  9. 112
      src/com/fr/plugin/hdfs/repository/core/HDFSRepositoryFactory.java
  10. 37
      src/com/fr/plugin/hdfs/repository/core/HDFSRepositoryProfile.java
  11. 299
      src/com/fr/plugin/hdfs/repository/core/HDFSResourceRepository.java

120
build.xml

@ -0,0 +1,120 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<project basedir="." default="jar" name="plugin">
<!-- JDK路径,根据自己机器上实际位置修改-->
<property name="jdk.home" value="/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home"/>
<property name="libs" value="${basedir}/lib"/>
<property name="publicLibs" value=""/>
<property name="destLoc" value="."/>
<property name="classes" value="classes"/>
<xmlproperty file="${basedir}/plugin.xml"/>
<property name="current-version" value="${plugin.version}"/>
<!-- 插件版本-->
<property name="plugin-version" value="${current-version}"/>
<!-- 插件名字-->
<property name="plugin-name" value="hdfs-repository"/>
<property name="plugin-jar" value="fr-plugin-${plugin-name}-${plugin-version}.jar"/>
<target name="prepare">
<delete dir="${classes}"/>
<delete dir="fr-plugin-${plugin-name}-${plugin-version}"/>
<xmlproperty file="${basedir}/plugin.xml"/>
<delete dir="${destLoc}/${plugin.name}"/>
</target>
<path id="compile.classpath">
<fileset dir="${libs}">
<include name="**/*.jar"/>
</fileset>
<fileset dir="${publicLibs}">
<include name="**/*.jar"/>
</fileset>
</path>
<patternset id="resources4Jar">
<exclude name="**/.settings/**"/>
<exclude name=".classpath"/>
<exclude name=".project"/>
<exclude name="**/*.java"/>
<exclude name="**/*.db"/>
<exclude name="**/*.g"/>
<exclude name="**/package.html"/>
</patternset>
<target name="copy_resources">
<echo message="从${resources_from}拷贝图片,JS,CSS等资源文件"/>
<delete dir="tmp"/>
<copy todir="tmp">
<fileset dir="${resources_from}\src">
<patternset refid="resources4Jar"/>
</fileset>
</copy>
<copy todir="${classes}">
<fileset dir="tmp"/>
</copy>
<delete dir="tmp"/>
</target>
<target name="compile_javas">
<echo message="编译${compile_files}下的Java文件"/>
<javac destdir="${classes}" debug="false" optimize="on" source="${source_jdk_version}"
target="${target_jdk_version}"
fork="true" memoryMaximumSize="512m" listfiles="false" srcdir="${basedir}"
executable="${compile_jdk_version}/bin/javac">
<src path="${basedir}/src"/>
<exclude name="**/.svn/**"/>
<compilerarg line="-encoding UTF8 "/>
<classpath refid="compile.classpath"/>
</javac>
</target>
<target name="jar_classes">
<echo message="打Jar包:${jar_name}"/>
<delete file="${basedir}/${jar_name}"/>
<unjar dest="${classes}">
<fileset dir="${libs}" includes="hdfs-all-2.9.0.jar"/>
</unjar>
<jar jarfile="${basedir}/${jar_name}">
<fileset dir="${classes}">
</fileset>
</jar>
</target>
<target name="super_jar" depends="prepare">
<antcall target="copy_resources">
<param name="resources_from" value="${basedir}"/>
</antcall>
<antcall target="compile_javas">
<param name="source_jdk_version" value="1.6"/>
<param name="target_jdk_version" value="1.6"/>
<param name="compile_jdk_version" value="${jdk.home}"/>
<param name="compile_files" value="${basedir}/src"/>
</antcall>
<echo message="compile plugin success!"/>
<antcall target="jar_classes">
<param name="jar_name" value="${plugin-jar}"/>
</antcall>
<delete dir="${classes}"/>
</target>
<target name="jar" depends="super_jar">
<antcall target="zip"/>
</target>
<target name="zip">
<property name="plugin-folder" value="fr-plugin-${plugin-name}-${plugin-version}"/>
<echo message="----------zip files----------"/>
<mkdir dir="${plugin-folder}"/>
<copy todir="${plugin-folder}">
<fileset dir=".">
<include name="${plugin-jar}"/>
<include name="plugin.xml"/>
</fileset>
</copy>
<zip destfile="${basedir}/${plugin-folder}.zip" basedir=".">
<include name="${plugin-folder}/*.jar"/>
<include name="${plugin-folder}/plugin.xml"/>
</zip>
<xmlproperty file="${basedir}/plugin.xml"/>
<move file="${plugin-folder}.zip" todir="${destLoc}/${plugin.name}"/>
</target>
</project>

BIN
img/db_fields.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 110 KiB

BIN
img/set.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

BIN
lib/hdfs-all-2.9.0.jar

Binary file not shown.

20
plugin.xml

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<plugin>
<id>com.fr.plugin.hdfs.repository</id>
<name><![CDATA[HDFS资源仓库]]></name>
<active>yes</active>
<hidden>no</hidden>
<version>1.0</version>
<env-version>10.0</env-version>
<jartime>2018-03-13</jartime>
<vendor>rinoux</vendor>
<description><![CDATA[描述]]></description>
<change-notes><![CDATA[无]]></change-notes>
<!--主包-->
<main-package>com.fr.plugin</main-package>
<!--功能记录点类-->
<function-recorder class="com.fr.plugin.hdfs.repository.HDFSFactoryProvider"/>
<extra-core>
<ResourceRepositoryFactoryProvider class="com.fr.plugin.hdfs.repository.HDFSFactoryProvider"/>
</extra-core>
</plugin>

22
readme.md

@ -0,0 +1,22 @@
# 开发和测试使用说明
- 1、hdfs仓库插件是平台文件服务器的扩展,首先需要正常方式安装插件;
- 2、在没有平台可视化设置之前,可以通过添加数据库字段来开启hdfs;
![字段](./img/db_fields.png)
> 上图中的ResourceModuleConfig.profiles.HDFS_TEST.workroots.[你的机器id(可以拷贝表中LOCAL_ENV的)]为hdfs的工作路径。
> host设置为Hadoop的hdfs地址,比如默认localhost,port为hdfs端口(默认9000)。
> 如果有用户组设置还需要对应用户名username。
- 3、添加数据库字段设置仓库为hdfs:
![设置](./img/set.png)
- 4、启动设计器或者平台
- 5、其它说明:
改数据库太麻烦,可以直接在适当的地方(比如com.fr.io.ResourceRepositoryActivator.doExtra()方法末尾)添加代码
``` Map<String, Object> hdfsConfig = new HashMap<String, Object>();
hdfsConfig.put("host", "localhost");
hdfsConfig.put("port", 9000);
try {
ProfileFactory.create("HDFS", "HDFS_TEST", "/app", true, hdfsConfig).install().apply();
} catch (RepositoryException e) {
e.printStackTrace();
}

80
src/com/fr/plugin/hdfs/repository/HDFSFactoryProvider.java

@ -0,0 +1,80 @@
package com.fr.plugin.hdfs.repository;
import com.fr.event.Event;
import com.fr.event.EventDispatcher;
import com.fr.event.Listener;
import com.fr.io.base.exception.RepositoryException;
import com.fr.io.base.provider.RepositoryFactoryProvider;
import com.fr.io.context.ResourceModuleContext;
import com.fr.io.context.info.InstalledComponent;
import com.fr.io.context.info.RepositoryApplyPolicy;
import com.fr.io.fun.AbstractRepositoryFactoryProvider;
import com.fr.log.FineLoggerFactory;
import com.fr.plugin.context.PluginContext;
import com.fr.plugin.hdfs.repository.core.HDFSConfig;
import com.fr.plugin.hdfs.repository.core.HDFSRepositoryFactory;
import com.fr.plugin.observer.PluginEventType;
import com.fr.plugin.transform.ExecuteFunctionRecord;
import com.fr.plugin.transform.FunctionRecorder;
import com.fr.stable.project.ProjectConstants;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Created by rinoux on 2018/8/10.
*/
@FunctionRecorder
public class HDFSFactoryProvider extends AbstractRepositoryFactoryProvider<HDFSConfig> {
private static HDFSRepositoryFactory factory = new HDFSRepositoryFactory();
/* static {
Listener<PluginContext> stopListener = new Listener<PluginContext>() {
@Override
public void on(Event event, PluginContext param) {
if (param.getMarker().getPluginID().equals("com.fr.plugin.hdfs.repository")) {
try {
if (ResourceModuleContext.getCurrentRepo().getIdentity().equals(HDFSRepositoryFactory.IDENTITY)) {
ResourceModuleContext.apply(ProjectConstants.LOCAL_ENV, RepositoryApplyPolicy.EXCLUSIVE);
}
Map<String, InstalledComponent> all = ResourceModuleContext.getInstaller().findAll();
List<InstalledComponent> components = new ArrayList<InstalledComponent>(all.values());
for (InstalledComponent component : components) {
if (component.getFactory().getIdentity().equals(HDFSRepositoryFactory.IDENTITY)) {
ResourceModuleContext.discard(component.getRepoName());
ResourceModuleContext.uninstall(component.getRepoName());
}
}
} catch (RepositoryException e) {
FineLoggerFactory.getLogger().error(e.getMessage());
} finally {
ResourceModuleContext.removeFactory(HDFSRepositoryFactory.IDENTITY);
}
}
}
};
Listener<PluginContext> startListener = new Listener<PluginContext>() {
@Override
public void on(Event event, PluginContext param) {
ResourceModuleContext.getFactoryLoader().add(factory);
}
};
EventDispatcher.listen(PluginEventType.BeforeStop, stopListener);
EventDispatcher.listen(PluginEventType.BeforeUnInstall, stopListener);
EventDispatcher.listen(PluginEventType.AfterActive, startListener);
EventDispatcher.listen(PluginEventType.AfterInstall, startListener);
}*/
@Override
@ExecuteFunctionRecord
public RepositoryFactoryProvider<HDFSConfig> getFactory() {
return factory;
}
}

16
src/com/fr/plugin/hdfs/repository/core/HDFSConfig.java

@ -0,0 +1,16 @@
package com.fr.plugin.hdfs.repository.core;
import com.fr.config.holder.Conf;
import com.fr.config.holder.factory.Holders;
import com.fr.io.config.CommonRepoConfig;
/**
* Created by rinoux on 2018/8/10.
*/
public class HDFSConfig extends CommonRepoConfig {
public HDFSConfig() {
super(HDFSRepositoryFactory.IDENTITY);
}
// TODO: 2018/8/10 config fields
}

112
src/com/fr/plugin/hdfs/repository/core/HDFSRepositoryFactory.java

@ -0,0 +1,112 @@
package com.fr.plugin.hdfs.repository.core;
import com.fr.general.GeneralUtils;
import com.fr.io.base.provider.impl.ConfigRepositoryFactory;
import com.fr.io.context.info.RepositoryProfile;
import com.fr.io.repository.ResourceRepository;
import com.fr.log.FineLoggerFactory;
import com.fr.stable.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
/**
* Created by rinoux on 2018/8/10.
*/
public class HDFSRepositoryFactory extends ConfigRepositoryFactory<HDFSConfig> {
static final String IDENTITY = "HDFS";
private static final String HDFS_SCHEMA = "hdfs://";
private static final String DEFAULT_HOST = "localhost";
private static final String DEFAULT_PORT = "9000";
public HDFSRepositoryFactory() {
super(IDENTITY);
}
@Override
public Class<? extends RepositoryProfile<HDFSConfig>> getProfileClass() {
return HDFSRepositoryProfile.class;
}
@Override
public Class<HDFSConfig> getConfigClass() {
return HDFSConfig.class;
}
@Override
public boolean verifyConfig(HDFSConfig config) {
FileSystem fileSystem = null;
try {
fileSystem = createFileSystem(config);
return fileSystem.exists(new Path("/"));
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage());
} finally {
if (fileSystem != null) {
try {
fileSystem.close();
} catch (IOException ignored) {
}
}
}
return false;
}
@Override
public ResourceRepository produce(String repoName, String workRoot, HDFSConfig config) {
return new HDFSResourceRepository(repoName, workRoot, createFileSystem(config));
}
private FileSystem createFileSystem(HDFSConfig config) {
FileSystem fs;
Configuration conf = createConfiguration();
//根据配置生成url
String host = StringUtils.isEmpty(config.getHost()) ? DEFAULT_HOST : config.getHost();
String port = config.getPort() == 0 ? DEFAULT_PORT : String.valueOf(config.getPort());
String hdfsUrl = HDFS_SCHEMA.concat(host).concat(":").concat(port);
String user = config.getUsername();
try {
if (StringUtils.isNotEmpty(user)) {
fs = FileSystem.get(URI.create(hdfsUrl), conf, user);
} else {
fs = FileSystem.get(URI.create(hdfsUrl), conf);
}
return fs;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 初始化hadoop-hdfs配置
*
* @return 配置
*/
private Configuration createConfiguration() {
Configuration conf = new Configuration(true);
//使用GeneralUtils才能加载到FileSystem实现类
conf.setClassLoader(new ClassLoader() {
@Override
public Class<?> loadClass(String name) throws ClassNotFoundException {
return GeneralUtils.classForName(name);
}
});
//设置hdfs协议实现
conf.setClass("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class, FileSystem.class);
return conf;
}
}

37
src/com/fr/plugin/hdfs/repository/core/HDFSRepositoryProfile.java

@ -0,0 +1,37 @@
package com.fr.plugin.hdfs.repository.core;
import com.fr.config.Identifier;
import com.fr.config.holder.Conf;
import com.fr.config.holder.factory.Holders;
import com.fr.io.context.info.RepositoryProfile;
/**
* Created by rinoux on 2018/8/10.
*/
public class HDFSRepositoryProfile extends RepositoryProfile<HDFSConfig> {
public HDFSRepositoryProfile() {
}
@Identifier("hdfsConfig")
private Conf<HDFSConfig> hdfsConfig = Holders.obj(null, HDFSConfig.class);
@Override
public HDFSConfig getConfig() {
return hdfsConfig.get();
}
@Override
public void setConfig(HDFSConfig config) {
this.hdfsConfig.set(config);
}
@Override
public RepositoryProfile<HDFSConfig> clone() throws CloneNotSupportedException {
HDFSRepositoryProfile cloned = (HDFSRepositoryProfile) super.clone();
cloned.hdfsConfig = (Conf<HDFSConfig>) this.hdfsConfig.clone();
return cloned;
}
}

299
src/com/fr/plugin/hdfs/repository/core/HDFSResourceRepository.java

@ -0,0 +1,299 @@
package com.fr.plugin.hdfs.repository.core;
import com.fr.io.base.exception.ConnectionException;
import com.fr.io.repository.FineFileEntry;
import com.fr.io.repository.base.BaseResourceRepository;
import com.fr.log.FineLoggerFactory;
import com.fr.stable.Filter;
import com.fr.workspace.resource.ResourceIOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
/**
* Created by rinoux on 2018/8/10.
*/
public class HDFSResourceRepository extends BaseResourceRepository {
private FileSystem fs;
public HDFSResourceRepository(String repoName, String workRoot, FileSystem fs) {
super(repoName, workRoot);
this.fs = fs;
}
@Override
public String getSeparator() {
return "/";
}
@Override
public FineFileEntry getEntry(String path) {
try {
FileStatus status = fs.getFileStatus(new Path(path));
return convertToFineFileEntry(status);
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
return null;
}
}
private FineFileEntry convertToFineFileEntry(FileStatus status) {
FineFileEntry entry = new FineFileEntry(status.getPath().toUri().getPath());
entry.setDirectory(status.isDirectory());
entry.setSize(status.getLen());
entry.setTimestamp(status.getModificationTime());
entry.setName(status.getPath().getName());
return entry;
}
@Override
public FineFileEntry[] listEntry(String dir) {
List<FineFileEntry> entryList = new ArrayList<FineFileEntry>();
try {
FileStatus[] statuses = fs.listStatus(new Path(dir));
for (FileStatus status : statuses) {
entryList.add(convertToFineFileEntry(status));
}
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
return entryList.toArray(new FineFileEntry[0]);
}
@Override
public URL getResource(String path) {
try {
FileStatus status = fs.getFileStatus(new Path(path));
return new URL(status.getPath().toUri().toString());
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
return null;
}
@Override
public InputStream read(String file) throws ResourceIOException {
try {
return fs.open(new Path(file));
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
return null;
}
}
@Override
public void write(String path, byte[] data) {
OutputStream os = null;
try {
os = fs.create(new Path(path), true);
os.write(data);
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
} finally {
try {
if (os != null) {
os.flush();
os.close();
}
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
}
}
@Override
public boolean createFile(String path) {
try {
return fs.createNewFile(new Path(path));
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
return false;
}
@Override
public boolean createDirectory(String path) {
try {
return fs.mkdirs(new Path(path));
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
return false;
}
@Override
public boolean delete(String path) {
try {
return fs.delete(new Path(path), true);
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
return false;
}
@Override
public boolean exist(String path) {
try {
return fs.exists(new Path(path));
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage());
}
return false;
}
@Override
public String[] list(String dir, final Filter<String> filter) {
List<String> result = new ArrayList<String>();
try {
FileStatus[] statuses = fs.listStatus(new Path(dir), new PathFilter() {
@Override
public boolean accept(Path path) {
return filter.accept(path.getName());
}
});
for (FileStatus status : statuses) {
result.add(status.getPath().getName());
}
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage());
}
return result.toArray(new String[0]);
}
@Override
public boolean isDirectory(String path) {
try {
return fs.isDirectory(new Path(path));
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
return false;
}
@Override
public long lastModified(String path) {
try {
return fs.getFileStatus(new Path(path)).getModificationTime();
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
return 0L;
}
@Override
public long length(String path) {
try {
return fs.getFileStatus(new Path(path)).getLen();
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
return 0L;
}
@Override
public boolean rename(String path, String newPath) throws ResourceIOException {
try {
return fs.rename(new Path(path), new Path(newPath));
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage());
}
return false;
}
@Override
public void appendWrite(String file, byte[] data) throws ResourceIOException {
FSDataOutputStream out = null;
try {
out = fs.append(new Path(file));
out.write(data);
} catch (ConnectException e) {
throw new ConnectionException();
} catch (IOException e) {
throw new ResourceIOException(e);
} finally {
if (out != null) {
try {
out.flush();
out.close();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage());
}
}
}
}
@Override
public void shutDown() {
try {
fs.close();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
}
@Override
public String getIdentity() {
return HDFSRepositoryFactory.IDENTITY;
}
}
Loading…
Cancel
Save