交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第5 章:hdfs 客户端HA 连接NameNodeProxiesClient
分享
未结
0
847
李延
LV6
2022-03-24
悬赏:20积分
# 1、HA 高可用配置 在连接HA的namenode时,我们需要添加如下配置,这样才可以连接通过集群名称连接。  同时fs.defaultFS也要配置为集群的名称才可以。那么这又是如何被加载的。我们分析一下。 # 2、 HA客户端代理创建 我们在前面分析知道namenode最后是ClientNamenodeProtocolTranslatorPB通过RPC向服务端发送请求的,而在DFSClient 创建时,有一个HA的判断,我们回顾一下 ```java public static ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol( Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth) throws IOException { //判断是否有ha 代理累 AbstractNNFailoverProxyProvider<ClientProtocol> failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, true, fallbackToSimpleAuth); //没有代理直接创建连接 if (failoverProxyProvider == null) { //获取连接地址 InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri); Text dtService = SecurityUtil.buildTokenService(nnAddr); //创建连接 ClientProtocol proxy = createNonHAProxyWithClientProtocol(nnAddr, conf, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); return new ProxyAndInfo<>(proxy, dtService, nnAddr); } else { return createHAProxy(conf, nameNodeUri, ClientProtocol.class, failoverProxyProvider); } } ``` 这里我们看到首先它去获取了createFailoverProxyProvider 对象,而createFailoverProxyProvider方法如下: ```java protected static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider( Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort, AtomicBoolean fallbackToSimpleAuth, HAProxyFactory<T> proxyFactory) throws IOException { Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null; AbstractNNFailoverProxyProvider<T> providerNN; try { // Obtain the class of the proxy provider //在配置文件中获取到dfs.client.failover.proxy.provider.[nameservice ID] 配置的class failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri); if (failoverProxyProviderClass == null) { return null; } // Create a proxy provider instance. //通过反射获取创建实例 Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass .getConstructor(Configuration.class, URI.class, Class.class, HAProxyFactory.class); FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri, xface, proxyFactory); // If the proxy provider is of an old implementation, wrap it. if (!(provider instanceof AbstractNNFailoverProxyProvider)) { providerNN = new WrappedFailoverProxyProvider<>(provider); } else { providerNN = (AbstractNNFailoverProxyProvider<T>)provider; } } catch (Exception e) { final String message = "Couldn't create proxy provider " + failoverProxyProviderClass; LOG.debug(message, e); if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); } else { throw new IOException(message, e); } } // Check the port in the URI, if it is logical. if (checkPort && providerNN.useLogicalURI()) { int port = nameNodeUri.getPort(); if (port > 0 && port != HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT) { // Throwing here without any cleanup is fine since we have not // actually created the underlying proxies yet. throw new IOException("Port " + port + " specified in URI " + nameNodeUri + " but host '" + nameNodeUri.getHost() + "' is a logical (HA) namenode" + " and does not use port information."); } } providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth); return providerNN; } ``` 这里我们看到是一个通过配置创建对于实例的过程。我们回头再看createHAProxy 方法 ```java public static <T> ProxyAndInfo<T> createHAProxy( Configuration conf, URI nameNodeUri, Class<T> xface, AbstractNNFailoverProxyProvider<T> failoverProxyProvider) { Preconditions.checkNotNull(failoverProxyProvider); // HA case DfsClientConf config = new DfsClientConf(conf); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(), config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(), config.getFailoverSleepMaxMillis())); Text dtService; if (failoverProxyProvider.useLogicalURI()) { dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri, HdfsConstants.HDFS_URI_SCHEME); } else { dtService = SecurityUtil.buildTokenService( DFSUtilClient.getNNAddress(nameNodeUri)); } return new ProxyAndInfo<>(proxy, dtService, DFSUtilClient.getNNAddressCheckLogical(conf, nameNodeUri)); } public static <T> Object create(Class<T> iface, FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) { return Proxy.newProxyInstance( proxyProvider.getInterface().getClassLoader(), new Class<?>[] { iface }, new RetryInvocationHandler<T>(proxyProvider, retryPolicy) ); } ``` 这里我们看到我们刚创建的proxyProvider对象,被传给了RetryInvocationHandler,而RetryInvocationHandler 就是我们最后的代理。它们大概关系为: - RetryInvocationHandler 支持重试的客户端,当请求异常时,重新生成连接进行请求。 - ConfiguredFailoverProxyProvider 我们配置的代理,HA 代理。为RetryInvocationHandler 不断生成新的RPC连接。 # 3、ConfiguredFailoverProxyProvider ConfiguredFailoverProxyProvider 父类为: AbstractNNFailoverProxyProvider。 前面我们分析到ConfiguredFailoverProxyProvider 主要功能是不断生成新的RPC代理。 ## 3.1 构造函数 ```java public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) { this(conf, uri, xface, factory, DFS_NAMENODE_RPC_ADDRESS_KEY); } public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory, String addressKey) { super(conf, uri, xface, factory); this.proxies = getProxyAddresses(uri, addressKey); } ``` 这里getProxyAddresses 方法就是加载配置文件中所有namnode的节点 ```java protected List<NNProxyInfo<T>> getProxyAddresses(URI uri, String addressKey) { final List<NNProxyInfo<T>> proxies = new ArrayList<NNProxyInfo<T>>(); Map<String, Map<String, InetSocketAddress>> map = DFSUtilClient.getAddresses(conf, null, addressKey); //更加uri的host加载对应的名称 Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost()); if (addressesInNN == null || addressesInNN.size() == 0) { throw new RuntimeException("Could not find any configured addresses " + "for URI " + uri); } // 将加载的名称转换为list Collection<InetSocketAddress> addressesOfNns = addressesInNN.values(); try { addressesOfNns = getResolvedHostsIfNecessary(addressesOfNns, uri); } catch (IOException e) { throw new RuntimeException(e); } for (InetSocketAddress address : addressesOfNns) { proxies.add(new NNProxyInfo<T>(address)); } // Randomize the list to prevent all clients pointing to the same one boolean randomized = getRandomOrder(conf, uri); if (randomized) { Collections.shuffle(proxies); } // The client may have a delegation token set for the logical // URI of the cluster. Clone this token to apply to each of the // underlying IPC addresses so that the IPC code can find it. HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns); return proxies; } ``` ## 3.2 getProxy ```java @Override public synchronized ProxyInfo<T> getProxy() { NNProxyInfo<T> current = proxies.get(currentProxyIndex); return createProxyIfNeeded(current); } ``` 这里我们看到它通过currentProxyIndex 来确定使用哪个url,而createProxyIfNeeded 在父类中实现,与单机情况一样的过程,如下: ```java protected NNProxyInfo<T> createProxyIfNeeded(NNProxyInfo<T> pi) { if (pi.proxy == null) { assert pi.getAddress() != null : "Proxy address is null"; try { pi.proxy = factory.createProxy(conf, pi.getAddress(), xface, ugi, false, getFallbackToSimpleAuth()); } catch (IOException ioe) { LOG.error("{} Failed to create RPC proxy to NameNode at {}", this.getClass().getSimpleName(), pi.address, ioe); throw new RuntimeException(ioe); } } return pi; } ``` 那么我们需要关注的是currentProxyIndex是如何变化的 ## 3.3 incrementProxyIndex 在incrementProxyIndex 方法中,我们可以看到currentProxyIndex 的切换,它就是一个轮询的方式。 ``` synchronized void incrementProxyIndex() { currentProxyIndex = (currentProxyIndex + 1) % proxies.size(); } ``` # 4 RetryInvocationHandler 前面我们分析了ConfiguredFailoverProxyProvider ,这里我们再看对应方法在RetryInvocationHandler时何时被调用的 首先它时jdk的动态代理,在我们使用namenode方法时,invoke方法就会被调用 ```java public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy()); final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID; final Call call = newCall(method, args, isRpc, callId); while (true) { final CallReturn c = call.invokeOnce(); final CallReturn.State state = c.getState(); if (state == CallReturn.State.ASYNC_INVOKED) { return null; // return null for async calls } else if (c.getState() != CallReturn.State.RETRY) { return c.getReturnValue(); } } } ``` 这里我们看到一个循环, call.invokeOnce() ,直到call返回成功。 ```java synchronized CallReturn invokeOnce() { try { if (retryInfo != null) { return processWaitTimeAndRetryInfo(); } // The number of times this invocation handler has ever been failed over // before this method invocation attempt. Used to prevent concurrent // failed method invocations from triggering multiple failover attempts. final long failoverCount = retryInvocationHandler.getFailoverCount(); try { //调用RPC return invoke(); } catch (Exception e) { if (LOG.isTraceEnabled()) { LOG.trace(toString(), e); } if (Thread.currentThread().isInterrupted()) { // If interrupted, do not retry. throw e; } retryInfo = retryInvocationHandler.handleException( method, callId, retryPolicy, counters, failoverCount, e); //切换连接 return processWaitTimeAndRetryInfo(); } } catch(Throwable t) { return new CallReturn(t); } } ``` 这里我们看到调用invoke 异常时,就是使用processWaitTimeAndRetryInfo方法 ```java CallReturn processWaitTimeAndRetryInfo() throws InterruptedIOException { final Long waitTime = getWaitTime(Time.monotonicNow()); LOG.trace("#{} processRetryInfo: retryInfo={}, waitTime={}", callId, retryInfo, waitTime); if (waitTime != null && waitTime > 0) { try { Thread.sleep(retryInfo.delay); } catch (InterruptedException e) { Thread.currentThread().interrupt(); if (LOG.isDebugEnabled()) { LOG.debug("Interrupted while waiting to retry", e); } InterruptedIOException intIOE = new InterruptedIOException( "Retry interrupted"); intIOE.initCause(e); throw intIOE; } } processRetryInfo(); return CallReturn.RETRY; } synchronized void processRetryInfo() { counters.retries++; if (retryInfo.isFailover()) { retryInvocationHandler.proxyDescriptor.failover( retryInfo.expectedFailoverCount, method, callId); counters.failovers++; } retryInfo = null; } synchronized void failover(long expectedFailoverCount, Method method, int callId) { // Make sure that concurrent failed invocations only cause a single // actual failover. if (failoverCount == expectedFailoverCount) { fpp.performFailover(proxyInfo.proxy); failoverCount++; } else { LOG.warn("A failover has occurred since the start of call #" + callId + " " + proxyInfo.getString(method.getName())); } proxyInfo = fpp.getProxy(); } ``` 一直到failover方法中,我们终于看到performFailover 切换代理,与获取新的代理方法调用
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号