交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第1 章:FileSystem的创建
分享
未结
0
967
李延
LV6
2022-03-19
悬赏:20积分
# 1. 客户端的使用 ```java Configuration configuration = new Configuration(); fileSystem = FileSystem.get(configuration); ``` 我们通过创建FileSystem 对象来使用hdfs的客户端。 但 FileSystem 有很多的实现类: 那么我们具体创建的是哪个实现类。 # 2. 源码分析 我们看一下FileSystem的get方法 ## 2.1 get ```java public static FileSystem get(Configuration conf) throws IOException { return get(getDefaultUri(conf), conf); } ``` 这里调用了重载的方法,其他通过getDefaultUri获取了URI ```java public static URI getDefaultUri(Configuration conf) { URI uri = URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS))); if (uri.getScheme() == null) { throw new IllegalArgumentException("No scheme in default FS: " + uri); } return uri; } ``` 这里我们看到url读取的是FS_DEFAULT_NAME_KEY 也就是 配置文件中的`fs.defaultFS`。 而当我们没有配置文件时,默认使用的是`file:///` 也就是本地的uri。 我们继续get 方法 ```java public static FileSystem get(URI uri, Configuration conf) throws IOException { String scheme = uri.getScheme(); String authority = uri.getAuthority(); //没有schema时,使用默认的 if (scheme == null && authority == null) { // use default FS return get(conf); } //当没有host时,使用默认的host if (scheme != null && authority == null) { // no authority URI defaultUri = getDefaultUri(conf); if (scheme.equals(defaultUri.getScheme()) // if scheme matches default && defaultUri.getAuthority() != null) { // & default has authority return get(defaultUri, conf); // return default } } //是否使用缓存,通过配置:fs.%s.impl.disable.cache 。其中%s 为不同的协议,可以是hdfs。 String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); if (conf.getBoolean(disableCacheName, false)) { LOGGER.debug("Bypassing cache to create filesystem {}", uri); return createFileSystem(uri, conf); } return CACHE.get(uri, conf); } ``` 上面主要是添加了一个FileSystem的缓存,我们可以通过配置fs.hdfs.impl.disable.cache 来设置 ## 2.2 createFileSystem ```java private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException { Tracer tracer = FsTracer.get(conf); try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem"); DurationInfo ignored = new DurationInfo(LOGGER, false, "Creating FS %s", uri)) { scope.addKVAnnotation("scheme", uri.getScheme()); //通过uri的schema 获取对应的FileSystem 实现类 Class<? extends FileSystem> clazz = getFileSystemClass(uri.getScheme(), conf); FileSystem fs = ReflectionUtils.newInstance(clazz, conf); try { //初始化 fs fs.initialize(uri, conf); } catch (IOException | RuntimeException e) { // exception raised during initialization. // log summary at warn and full stack at debug LOGGER.warn("Failed to initialize fileystem {}: {}", uri, e.toString()); LOGGER.debug("Failed to initialize fileystem", e); // then (robustly) close the FS, so as to invoke any // cleanup code. IOUtils.cleanupWithLogger(LOGGER, fs); throw e; } return fs; } } ``` 这里我们看到,其实FileSystem 使用哪个其实取决于我们uri的协议。我们先看一下它是如何获取的 ## 2.3 getFileSystemClass ```java public static Class<? extends FileSystem> getFileSystemClass(String scheme, Configuration conf) throws IOException { //加载所有的fs 对象。 if (!FILE_SYSTEMS_LOADED) { loadFileSystems(); } LOGGER.debug("Looking for FS supporting {}", scheme); Class<? extends FileSystem> clazz = null; // 判断配置文件中是否有指定。 if (conf != null) { String property = "fs." + scheme + ".impl"; LOGGER.debug("looking for configuration option {}", property); clazz = (Class<? extends FileSystem>) conf.getClass( property, null); } else { LOGGER.debug("No configuration: skipping check for fs.{}.impl", scheme); } //通过schema 获取 if (clazz == null) { LOGGER.debug("Looking in service filesystems for implementation class"); clazz = SERVICE_FILE_SYSTEMS.get(scheme); } else { LOGGER.debug("Filesystem {} defined in configuration option", scheme); } if (clazz == null) { throw new UnsupportedFileSystemException("No FileSystem for scheme " + "\"" + scheme + "\""); } LOGGER.debug("FS for {} is {}", scheme, clazz); return clazz; } ``` 这里我们看到首先它是通过loadFileSystems 加载了所有的fs实现,将它放到SERVICE_FILE_SYSTEMS 这个map中。 之后根据配置文件或者uri的schema 来判断。 我们看一下它是如何加载的 ```java private static void loadFileSystems() { LOGGER.debug("Loading filesystems"); synchronized (FileSystem.class) { if (!FILE_SYSTEMS_LOADED) { // 通过ServiceLoader ,与jdbc 字段是驱动一致 ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class); Iterator<FileSystem> it = serviceLoader.iterator(); while (it.hasNext()) { FileSystem fs; try { fs = it.next(); try { SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("{}:// = {} from {}", fs.getScheme(), fs.getClass(), ClassUtil.findContainingJar(fs.getClass())); } } catch (Exception e) { LOGGER.warn("Cannot load: {} from {}", fs, ClassUtil.findContainingJar(fs.getClass())); LOGGER.info("Full exception loading: {}", fs, e); } } catch (ServiceConfigurationError ee) { LOG.warn("Cannot load filesystem: " + ee); Throwable cause = ee.getCause(); // print all the nested exception messages while (cause != null) { LOG.warn(cause.toString()); cause = cause.getCause(); } // and at debug: the full stack LOG.debug("Stack Trace", ee); } } FILE_SYSTEMS_LOADED = true; } } } ``` 这里与jdbc的加载驱动基本一样,我们可以看一下hdfs的jar内配置文件,他在common和hdfs-client中都有 org.apache.hadoop.fs.LocalFileSystem org.apache.hadoop.fs.viewfs.ViewFileSystem org.apache.hadoop.fs.HarFileSystem org.apache.hadoop.fs.http.HttpFileSystem org.apache.hadoop.fs.http.HttpsFileSystem org.apache.hadoop.hdfs.DistributedFileSystem org.apache.hadoop.hdfs.web.WebHdfsFileSystem org.apache.hadoop.hdfs.web.SWebHdfsFileSystem 一共有8个不同实现,其中DistributedFileSystem 就是我们hdfs 的实现 ```java public static final String HDFS_URI_SCHEME = "hdfs"; @Override public String getScheme() { return HdfsConstants.HDFS_URI_SCHEME; } ```
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号