commit e16ec06d0a19abeb613328bc01e002fc2b149a2d Author: rinoux Date: Fri Sep 7 16:18:29 2018 +0800 REPORT-10696 hdfs资源仓库插件上传 diff --git a/build.xml b/build.xml new file mode 100644 index 0000000..e0bf2d8 --- /dev/null +++ b/build.xml @@ -0,0 +1,120 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/img/db_fields.png b/img/db_fields.png new file mode 100644 index 0000000..e24b591 Binary files /dev/null and b/img/db_fields.png differ diff --git a/img/set.png b/img/set.png new file mode 100644 index 0000000..a2c16c7 Binary files /dev/null and b/img/set.png differ diff --git a/lib/hdfs-all-2.9.0.jar b/lib/hdfs-all-2.9.0.jar new file mode 100644 index 0000000..f458e5a Binary files /dev/null and b/lib/hdfs-all-2.9.0.jar differ diff --git a/plugin.xml b/plugin.xml new file mode 100644 index 0000000..5da4575 --- /dev/null +++ b/plugin.xml @@ -0,0 +1,20 @@ + + + com.fr.plugin.hdfs.repository + + yes + no + 1.0 + 10.0 + 2018-03-13 + rinoux + + + + com.fr.plugin + + + + + + \ No newline at end of file diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..ffdddbd --- /dev/null +++ b/readme.md @@ -0,0 +1,22 @@ +# 开发和测试使用说明 +- 1、hdfs仓库插件是平台文件服务器的扩展,首先需要正常方式安装插件; +- 2、在没有平台可视化设置之前,可以通过添加数据库字段来开启hdfs; +![字段](./img/db_fields.png) +> 上图中的ResourceModuleConfig.profiles.HDFS_TEST.workroots.[你的机器id(可以拷贝表中LOCAL_ENV的)]为hdfs的工作路径。 +> host设置为Hadoop的hdfs地址,比如默认localhost,port为hdfs端口(默认9000)。 +> 如果有用户组设置还需要对应用户名username。 +- 3、添加数据库字段设置仓库为hdfs: +![设置](./img/set.png) +- 4、启动设计器或者平台 + +- 5、其它说明: + +改数据库太麻烦,可以直接在适当的地方(比如com.fr.io.ResourceRepositoryActivator.doExtra()方法末尾)添加代码 +``` Map hdfsConfig = new HashMap(); + hdfsConfig.put("host", "localhost"); + hdfsConfig.put("port", 9000); + try { + ProfileFactory.create("HDFS", "HDFS_TEST", "/app", true, hdfsConfig).install().apply(); + } catch (RepositoryException e) { + e.printStackTrace(); + } \ No newline at end of file diff --git a/src/com/fr/plugin/hdfs/repository/HDFSFactoryProvider.java b/src/com/fr/plugin/hdfs/repository/HDFSFactoryProvider.java new file mode 100644 index 0000000..b5d9e22 --- /dev/null +++ b/src/com/fr/plugin/hdfs/repository/HDFSFactoryProvider.java @@ -0,0 +1,80 @@ +package com.fr.plugin.hdfs.repository; + +import com.fr.event.Event; +import com.fr.event.EventDispatcher; +import com.fr.event.Listener; +import com.fr.io.base.exception.RepositoryException; +import com.fr.io.base.provider.RepositoryFactoryProvider; +import com.fr.io.context.ResourceModuleContext; +import com.fr.io.context.info.InstalledComponent; +import com.fr.io.context.info.RepositoryApplyPolicy; +import com.fr.io.fun.AbstractRepositoryFactoryProvider; +import com.fr.log.FineLoggerFactory; +import com.fr.plugin.context.PluginContext; +import com.fr.plugin.hdfs.repository.core.HDFSConfig; +import com.fr.plugin.hdfs.repository.core.HDFSRepositoryFactory; +import com.fr.plugin.observer.PluginEventType; +import com.fr.plugin.transform.ExecuteFunctionRecord; +import com.fr.plugin.transform.FunctionRecorder; +import com.fr.stable.project.ProjectConstants; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Created by rinoux on 2018/8/10. + */ +@FunctionRecorder +public class HDFSFactoryProvider extends AbstractRepositoryFactoryProvider { + + private static HDFSRepositoryFactory factory = new HDFSRepositoryFactory(); + + /* static { + Listener stopListener = new Listener() { + @Override + public void on(Event event, PluginContext param) { + if (param.getMarker().getPluginID().equals("com.fr.plugin.hdfs.repository")) { + try { + if (ResourceModuleContext.getCurrentRepo().getIdentity().equals(HDFSRepositoryFactory.IDENTITY)) { + ResourceModuleContext.apply(ProjectConstants.LOCAL_ENV, RepositoryApplyPolicy.EXCLUSIVE); + } + + Map all = ResourceModuleContext.getInstaller().findAll(); + List components = new ArrayList(all.values()); + for (InstalledComponent component : components) { + if (component.getFactory().getIdentity().equals(HDFSRepositoryFactory.IDENTITY)) { + ResourceModuleContext.discard(component.getRepoName()); + ResourceModuleContext.uninstall(component.getRepoName()); + } + } + } catch (RepositoryException e) { + FineLoggerFactory.getLogger().error(e.getMessage()); + } finally { + ResourceModuleContext.removeFactory(HDFSRepositoryFactory.IDENTITY); + } + } + } + }; + + Listener startListener = new Listener() { + @Override + public void on(Event event, PluginContext param) { + ResourceModuleContext.getFactoryLoader().add(factory); + } + }; + + EventDispatcher.listen(PluginEventType.BeforeStop, stopListener); + EventDispatcher.listen(PluginEventType.BeforeUnInstall, stopListener); + + + EventDispatcher.listen(PluginEventType.AfterActive, startListener); + EventDispatcher.listen(PluginEventType.AfterInstall, startListener); + }*/ + + @Override + @ExecuteFunctionRecord + public RepositoryFactoryProvider getFactory() { + return factory; + } +} diff --git a/src/com/fr/plugin/hdfs/repository/core/HDFSConfig.java b/src/com/fr/plugin/hdfs/repository/core/HDFSConfig.java new file mode 100644 index 0000000..3103ff5 --- /dev/null +++ b/src/com/fr/plugin/hdfs/repository/core/HDFSConfig.java @@ -0,0 +1,16 @@ +package com.fr.plugin.hdfs.repository.core; + +import com.fr.config.holder.Conf; +import com.fr.config.holder.factory.Holders; +import com.fr.io.config.CommonRepoConfig; + +/** + * Created by rinoux on 2018/8/10. + */ +public class HDFSConfig extends CommonRepoConfig { + public HDFSConfig() { + super(HDFSRepositoryFactory.IDENTITY); + } + + // TODO: 2018/8/10 config fields +} diff --git a/src/com/fr/plugin/hdfs/repository/core/HDFSRepositoryFactory.java b/src/com/fr/plugin/hdfs/repository/core/HDFSRepositoryFactory.java new file mode 100644 index 0000000..f35a834 --- /dev/null +++ b/src/com/fr/plugin/hdfs/repository/core/HDFSRepositoryFactory.java @@ -0,0 +1,112 @@ +package com.fr.plugin.hdfs.repository.core; + +import com.fr.general.GeneralUtils; +import com.fr.io.base.provider.impl.ConfigRepositoryFactory; +import com.fr.io.context.info.RepositoryProfile; +import com.fr.io.repository.ResourceRepository; +import com.fr.log.FineLoggerFactory; +import com.fr.stable.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.net.URI; + +/** + * Created by rinoux on 2018/8/10. + */ +public class HDFSRepositoryFactory extends ConfigRepositoryFactory { + static final String IDENTITY = "HDFS"; + private static final String HDFS_SCHEMA = "hdfs://"; + private static final String DEFAULT_HOST = "localhost"; + private static final String DEFAULT_PORT = "9000"; + + + public HDFSRepositoryFactory() { + super(IDENTITY); + } + + @Override + public Class> getProfileClass() { + return HDFSRepositoryProfile.class; + } + + @Override + public Class getConfigClass() { + return HDFSConfig.class; + } + + @Override + public boolean verifyConfig(HDFSConfig config) { + FileSystem fileSystem = null; + try { + fileSystem = createFileSystem(config); + return fileSystem.exists(new Path("/")); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage()); + } finally { + if (fileSystem != null) { + try { + fileSystem.close(); + } catch (IOException ignored) { + + } + } + } + return false; + } + + @Override + public ResourceRepository produce(String repoName, String workRoot, HDFSConfig config) { + return new HDFSResourceRepository(repoName, workRoot, createFileSystem(config)); + } + + + private FileSystem createFileSystem(HDFSConfig config) { + FileSystem fs; + Configuration conf = createConfiguration(); + + //根据配置生成url + String host = StringUtils.isEmpty(config.getHost()) ? DEFAULT_HOST : config.getHost(); + String port = config.getPort() == 0 ? DEFAULT_PORT : String.valueOf(config.getPort()); + + String hdfsUrl = HDFS_SCHEMA.concat(host).concat(":").concat(port); + String user = config.getUsername(); + try { + if (StringUtils.isNotEmpty(user)) { + fs = FileSystem.get(URI.create(hdfsUrl), conf, user); + } else { + fs = FileSystem.get(URI.create(hdfsUrl), conf); + } + + + return fs; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + /** + * 初始化hadoop-hdfs配置 + * + * @return 配置 + */ + private Configuration createConfiguration() { + Configuration conf = new Configuration(true); + + //使用GeneralUtils才能加载到FileSystem实现类 + conf.setClassLoader(new ClassLoader() { + @Override + public Class loadClass(String name) throws ClassNotFoundException { + return GeneralUtils.classForName(name); + } + }); + + //设置hdfs协议实现 + conf.setClass("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class, FileSystem.class); + + return conf; + } +} diff --git a/src/com/fr/plugin/hdfs/repository/core/HDFSRepositoryProfile.java b/src/com/fr/plugin/hdfs/repository/core/HDFSRepositoryProfile.java new file mode 100644 index 0000000..ae5c258 --- /dev/null +++ b/src/com/fr/plugin/hdfs/repository/core/HDFSRepositoryProfile.java @@ -0,0 +1,37 @@ +package com.fr.plugin.hdfs.repository.core; + +import com.fr.config.Identifier; +import com.fr.config.holder.Conf; +import com.fr.config.holder.factory.Holders; +import com.fr.io.context.info.RepositoryProfile; + +/** + * Created by rinoux on 2018/8/10. + */ +public class HDFSRepositoryProfile extends RepositoryProfile { + + public HDFSRepositoryProfile() { + } + + @Identifier("hdfsConfig") + private Conf hdfsConfig = Holders.obj(null, HDFSConfig.class); + + @Override + public HDFSConfig getConfig() { + return hdfsConfig.get(); + } + + @Override + public void setConfig(HDFSConfig config) { + this.hdfsConfig.set(config); + } + + + @Override + public RepositoryProfile clone() throws CloneNotSupportedException { + HDFSRepositoryProfile cloned = (HDFSRepositoryProfile) super.clone(); + cloned.hdfsConfig = (Conf) this.hdfsConfig.clone(); + + return cloned; + } +} diff --git a/src/com/fr/plugin/hdfs/repository/core/HDFSResourceRepository.java b/src/com/fr/plugin/hdfs/repository/core/HDFSResourceRepository.java new file mode 100644 index 0000000..eaeb07a --- /dev/null +++ b/src/com/fr/plugin/hdfs/repository/core/HDFSResourceRepository.java @@ -0,0 +1,299 @@ +package com.fr.plugin.hdfs.repository.core; + +import com.fr.io.base.exception.ConnectionException; +import com.fr.io.repository.FineFileEntry; +import com.fr.io.repository.base.BaseResourceRepository; +import com.fr.log.FineLoggerFactory; +import com.fr.stable.Filter; +import com.fr.workspace.resource.ResourceIOException; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ConnectException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +/** + * Created by rinoux on 2018/8/10. + */ +public class HDFSResourceRepository extends BaseResourceRepository { + private FileSystem fs; + + + public HDFSResourceRepository(String repoName, String workRoot, FileSystem fs) { + super(repoName, workRoot); + this.fs = fs; + } + + + @Override + public String getSeparator() { + return "/"; + } + + @Override + public FineFileEntry getEntry(String path) { + try { + FileStatus status = fs.getFileStatus(new Path(path)); + + return convertToFineFileEntry(status); + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + return null; + } + } + + + private FineFileEntry convertToFineFileEntry(FileStatus status) { + FineFileEntry entry = new FineFileEntry(status.getPath().toUri().getPath()); + + entry.setDirectory(status.isDirectory()); + entry.setSize(status.getLen()); + entry.setTimestamp(status.getModificationTime()); + entry.setName(status.getPath().getName()); + + + return entry; + } + + @Override + public FineFileEntry[] listEntry(String dir) { + List entryList = new ArrayList(); + try { + FileStatus[] statuses = fs.listStatus(new Path(dir)); + + for (FileStatus status : statuses) { + entryList.add(convertToFineFileEntry(status)); + } + + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage(), e); + } + return entryList.toArray(new FineFileEntry[0]); + } + + @Override + public URL getResource(String path) { + try { + FileStatus status = fs.getFileStatus(new Path(path)); + + return new URL(status.getPath().toUri().toString()); + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage(), e); + } + + return null; + } + + @Override + public InputStream read(String file) throws ResourceIOException { + try { + return fs.open(new Path(file)); + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + return null; + } + } + + @Override + public void write(String path, byte[] data) { + OutputStream os = null; + try { + os = fs.create(new Path(path), true); + os.write(data); + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage(), e); + } finally { + try { + if (os != null) { + os.flush(); + os.close(); + } + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage(), e); + } + } + } + + @Override + public boolean createFile(String path) { + try { + return fs.createNewFile(new Path(path)); + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage(), e); + } + return false; + } + + @Override + public boolean createDirectory(String path) { + try { + return fs.mkdirs(new Path(path)); + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage(), e); + } + + return false; + } + + @Override + public boolean delete(String path) { + try { + return fs.delete(new Path(path), true); + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage(), e); + } + + return false; + } + + @Override + public boolean exist(String path) { + try { + return fs.exists(new Path(path)); + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage()); + } + + return false; + } + + @Override + public String[] list(String dir, final Filter filter) { + List result = new ArrayList(); + try { + FileStatus[] statuses = fs.listStatus(new Path(dir), new PathFilter() { + @Override + public boolean accept(Path path) { + return filter.accept(path.getName()); + } + }); + + for (FileStatus status : statuses) { + result.add(status.getPath().getName()); + } + + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage()); + } + return result.toArray(new String[0]); + } + + @Override + public boolean isDirectory(String path) { + try { + return fs.isDirectory(new Path(path)); + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage(), e); + } + + return false; + } + + @Override + public long lastModified(String path) { + try { + return fs.getFileStatus(new Path(path)).getModificationTime(); + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage(), e); + } + + return 0L; + } + + @Override + public long length(String path) { + try { + return fs.getFileStatus(new Path(path)).getLen(); + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage(), e); + } + + return 0L; + } + + + @Override + public boolean rename(String path, String newPath) throws ResourceIOException { + try { + return fs.rename(new Path(path), new Path(newPath)); + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage()); + } + + return false; + } + + + @Override + public void appendWrite(String file, byte[] data) throws ResourceIOException { + FSDataOutputStream out = null; + try { + out = fs.append(new Path(file)); + out.write(data); + + } catch (ConnectException e) { + throw new ConnectionException(); + } catch (IOException e) { + throw new ResourceIOException(e); + } finally { + if (out != null) { + try { + out.flush(); + out.close(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage()); + } + } + } + } + + + @Override + public void shutDown() { + try { + fs.close(); + } catch (IOException e) { + FineLoggerFactory.getLogger().error(e.getMessage(), e); + } + } + + @Override + public String getIdentity() { + return HDFSRepositoryFactory.IDENTITY; + } + +}