交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第2 章:DFSClient
分享
未结
0
806
李延
LV6
2022-03-20
悬赏:20积分
# 1 创建 前面我们分析到hdfs最后使用的FileSystem是DistributedFileSystem对象,而在它的inti方法中我们看到: ```java public void initialize(URI uri, Configuration conf) throws IOException { super.initialize(uri, conf); setConf(conf); String host = uri.getHost(); if (host == null) { throw new IOException("Incomplete HDFS URI, no host: "+ uri); } initDFSClient(uri, conf); this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority()); this.workingDir = getHomeDirectory(); storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE .put(DFSOpsCountStatistics.NAME, new StorageStatisticsProvider() { @Override public StorageStatistics provide() { return new DFSOpsCountStatistics(); } }); } void initDFSClient(URI theUri, Configuration conf) throws IOException { this.dfs = new DFSClient(theUri, conf, statistics); } ``` 这里我们看到创建了DFSClient 对象。 # 2 ClientProtocol 创建 在DFSClient 构造方法中代码非常多,这里我们主要关注ClientProtocol的创建 ```java public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) throws IOException { Preconditions.checkArgument(nameNodeUri != null, "null URI"); proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, nameNodeUri, nnFallbackToSimpleAuth); this.dtService = proxyInfo.getDelegationTokenService(); this.namenode = proxyInfo.getProxy(); } ``` 我们看到调用了NameNodeProxiesClient.createProxyWithClientProtocol 方法 ```java public static ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol( Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth) throws IOException { //判断是否有ha 代理累 AbstractNNFailoverProxyProvider<ClientProtocol> failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, true, fallbackToSimpleAuth); //没有代理直接创建连接 if (failoverProxyProvider == null) { //获取连接地址 InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri); Text dtService = SecurityUtil.buildTokenService(nnAddr); //创建连接 ClientProtocol proxy = createNonHAProxyWithClientProtocol(nnAddr, conf, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); return new ProxyAndInfo<>(proxy, dtService, nnAddr); } else { return createHAProxy(conf, nameNodeUri, ClientProtocol.class, failoverProxyProvider); } } ``` 这里有两种不同模式 - 为HA 连接,如果配置了dfs.client.failover.proxy.provider.ID 的变量,就使用代理类进行连接 - 否则根据host创建ClientProtocol 对象 这里我们先关注非HA的模式,HA单独再说明 ```java public static ClientProtocol createProxyWithAlignmentContext( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) throws IOException { // 设置IPC 配置,指定ProtobufRpcEngine2 解析器 RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine2.class); final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy( conf, HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY, HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT, SafeModeException.class.getName()); //创建ClientNamenodeProtocolPB的RPC代理 final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy, fallbackToSimpleAuth, alignmentContext).getProxy(); //有代理模式 if (withRetries) { // create the proxy with retries Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>(); ClientProtocol translatorProxy = new ClientNamenodeProtocolTranslatorPB(proxy); return (ClientProtocol) RetryProxy.create( ClientProtocol.class, new DefaultFailoverProxyProvider<>(ClientProtocol.class, translatorProxy), methodNameToPolicyMap, defaultPolicy); } else { //通过ClientNamenodeProtocolTranslatorPB 进行包装 return new ClientNamenodeProtocolTranslatorPB(proxy); } } ``` 这里我们主要关注两个类的创建ClientNamenodeProtocolPB、ClientNamenodeProtocolTranslatorPB。 他们都是ClientProtocol 子类,区别为: - ClientNamenodeProtocolPB 是通过RPC创建的代理,我们发送给nameNode的命令,都是通过这个代理进行发送,但它的参数与返回值都是统一的类型。无论我们使用客户端还是http都是通过这个类来实现实现的。 - ClientNamenodeProtocolTranslatorPB 是ClientNamenodeProtocolPB的适配器,它可以将我们客户端的参数统一适配为ClientNamenodeProtocolPB需要的参数。 ## 3 总结 DFSClient 是 nameNode 的客户端,而我们最终发给服务器的命令都是通过ClientNamenodeProtocolPB 以RPC的方式进行发送的。
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号