交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第4-2 章:hadoop-RPC 客户端分析
分享
未结
0
823
李延
LV6
2022-03-19
悬赏:20积分
# 1. 总体说明 在客户端中,主要可以分为一下几点: - tcp数据发送与接收 - 数据包的解析 - RPC代理类的生成 我们从以上几方面进行解析 # 2 代理类 我们首先从我们使用使用的方法开始 ```java final RPCProtocolTest localhost = RPC.getProxy(RPCProtocolTest.class, RPCProtocolTest.versionID, new InetSocketAddress("localhost", 8080), new Configuration()); ``` 这里我们看到通过调用getProxy 生成了RPC的代理类,进入这个类我们看到 ```java public static <T> T getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy(); } ``` 在getProtocolProxy ```java public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { return getProtocolProxy(protocol, clientVersion, addr, conf, NetUtils .getDefaultSocketFactory(conf)); } ``` 其中最后一个参数是SocketFactory工厂类,生成Socket对象。我们跳过中间调用部分,直接看最后调用的方法 ```java // return the RpcEngine configured to handle a protocol static synchronized RpcEngine getProtocolEngine(Class<?> protocol, Configuration conf) { RpcEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), WritableRpcEngine.class); engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); PROTOCOL_ENGINES.put(protocol, engine); } return engine; } ``` 我们看到默认使用的还是我们之前服务端的WritableRpcEngine。来到WritableRpcEngine的getProxy方法 ```java public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) throws IOException { if (connectionRetryPolicy != null) { throw new UnsupportedOperationException( "Not supported: connectionRetryPolicy=" + connectionRetryPolicy); } T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, fallbackToSimpleAuth, alignmentContext)); return new ProtocolProxy<T>(protocol, proxy, true); } ``` 这里我们看到使用的就是jdk 的动态代理,而代理类就是Invoker类。我们看一下它的构造方法 ```java public Invoker(Class<?> protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, null, conf); this.client = CLIENTS.getClient(conf, factory); this.fallbackToSimpleAuth = fallbackToSimpleAuth; this.alignmentContext = alignmentContext; } ``` 在这里我们看到创建了client对象,而它就是负责我们网络通信的。 我们再回头看一下Invoker 的Invoker 方法,他就是我们调用方法时执行的方法。 ```java @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { long startTime = 0; if (LOG.isDebugEnabled()) { startTime = Time.monotonicNow(); } // if Tracing is on then start a new span for this rpc. // guard it in the if statement to make sure there isn't // any extra string manipulation. Tracer tracer = Tracer.curThreadTracer(); TraceScope traceScope = null; if (tracer != null) { traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method)); } ObjectWritable value; try { value = (ObjectWritable) client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId, fallbackToSimpleAuth, alignmentContext); } finally { if (traceScope != null) traceScope.close(); } if (LOG.isDebugEnabled()) { long callTime = Time.monotonicNow() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } ``` 这里我们看到最后调用的是 client.call 方法。 # 3. Client 前面我们分析到RPC是通过Client进行网络通信的。我们先看一下构造方法 ```java public Client(Class<? extends Writable> valueClass, Configuration conf, SocketFactory factory) { this.valueClass = valueClass; this.conf = conf; this.socketFactory = factory; this.connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); this.bindToWildCardAddress = conf .getBoolean(CommonConfigurationKeys.IPC_CLIENT_BIND_WILDCARD_ADDR_KEY, CommonConfigurationKeys.IPC_CLIENT_BIND_WILDCARD_ADDR_DEFAULT); this.clientId = ClientId.getClientId(); this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance(); this.maxAsyncCalls = conf.getInt( CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT); } ``` 其中都是对于网络连接的配置读取 ## 3.1 call方法 ```java Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) throws IOException { //将一个请求封装为Call 对象 final Call call = createCall(rpcKind, rpcRequest); call.setAlignmentContext(alignmentContext); //建立连接,并发送请求 final Connection connection = getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth); try { checkAsyncCall(); try { //设置异步接收请求 connection.sendRpcRequest(call); // send the rpc request } catch (RejectedExecutionException e) { throw new IOException("connection has been closed", e); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); IOException ioe = new InterruptedIOException( "Interrupted waiting to send RPC request to server"); ioe.initCause(ie); throw ioe; } } catch(Exception e) { if (isAsynchronousMode()) { releaseAsyncCall(); } throw e; } if (isAsynchronousMode()) { final AsyncGet<Writable, IOException> asyncGet = new AsyncGet<Writable, IOException>() { @Override public Writable get(long timeout, TimeUnit unit) throws IOException, TimeoutException{ boolean done = true; try { final Writable w = getRpcResponse(call, connection, timeout, unit); if (w == null) { done = false; throw new TimeoutException(call + " timed out " + timeout + " " + unit); } return w; } finally { if (done) { releaseAsyncCall(); } } } @Override public boolean isDone() { synchronized (call) { return call.done; } } }; ASYNC_RPC_RESPONSE.set(asyncGet); return null; } else { //阻塞当代请求完成,并获取返回值 return getRpcResponse(call, connection, -1, null); } } ``` 这里基本完成了一个请求的全部过程,分别看一下不同的步骤 ## 3.2 getConnection 这一步是创建连接并发送请求 ```java private Connection getConnection(ConnectionId remoteId, Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { final InetSocketAddress address = remoteId.getAddress(); if (address.isUnresolved()) { throw NetUtils.wrapException(address.getHostName(), address.getPort(), null, 0, new UnknownHostException()); } final Consumer<Connection> removeMethod = c -> { final boolean removed = connections.remove(remoteId, c); if (removed && connections.isEmpty()) { synchronized (emptyCondition) { emptyCondition.notify(); } } }; Connection connection; /* we could avoid this allocation for each RPC by having a * connectionsId object and with set() method. We need to manage the * refs for keys in HashMap properly. For now its ok. */ while (true) { synchronized (putLock) { // synchronized to avoid put after stop if (!running.get()) { throw new IOException("Failed to get connection for " + remoteId + ", " + call + ": " + this + " is already stopped"); } //创建Connection对象 connection = connections.computeIfAbsent(remoteId, id -> new Connection(id, serviceClass, removeMethod)); } //将call添加到队列中,异步等待服务端的返回值 if (connection.addCall(call)) { break; } else { // This connection is closed, should be removed. But other thread could // have already known this closedConnection, and replace it with a new // connection. So we should call conditional remove to make sure we only // remove this closedConnection. removeMethod.accept(connection); } } // If the server happens to be slow, the method below will take longer to // establish a connection. //发送请求 connection.setupIOstreams(fallbackToSimpleAuth); return connection; } ``` 这里主要为两个方法 connection.setupIOstreams(fallbackToSimpleAuth); 将数据发送,其中是通过子类ipcStreams 进行发送的, 对于返回值,是通过另一个线程,在connection.addCall中。 ```java private synchronized boolean addCall(Call call) { if (shouldCloseConnection.get()) return false; calls.put(call.id, call); notify(); return true; } ``` 这里,我们就看到将call对象添加到了calls中。而在另一个线程中,我们就可以看到对calls对象的读取。我们看run方法 ```java @Override public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); try { while (waitForWork()) {//wait here for work - read or close connection receiveRpcResponse(); } } catch (Throwable t) { // This truly is unexpected, since we catch IOException in receiveResponse // -- this is only to be really sure that we don't leave a client hanging // forever. LOG.warn("Unexpected error reading responses on connection " + this, t); markClosed(new IOException("Error reading responses", t)); } close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); } ``` 这里有一个循环,条件是waitForWork,判断依据就是是否超时和calls中是否有对象。 而结果处理如下: ```java private void receiveRpcResponse() { if (shouldCloseConnection.get()) { return; } touch(); try { // 等待网络返回数据 ByteBuffer bb = ipcStreams.readResponse(); RpcWritable.Buffer packet = RpcWritable.Buffer.wrap(bb); RpcResponseHeaderProto header = packet.getValue(RpcResponseHeaderProto.getDefaultInstance()); checkResponse(header); //获取到callId int callId = header.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId); //结果输了,将calls中对于的请求获取并移除集合 //将返回值放入到call对象中,并修改结果状态 RpcStatusProto status = header.getStatus(); if (status == RpcStatusProto.SUCCESS) { Writable value = packet.newInstance(valueClass, conf); final Call call = calls.remove(callId); call.setRpcResponse(value); if (call.alignmentContext != null) { call.alignmentContext.receiveResponseState(header); } } // verify that packet length was correct if (packet.remaining() > 0) { throw new RpcClientException("RPC response length mismatch"); } if (status != RpcStatusProto.SUCCESS) { // Rpc Request failed final String exceptionClassName = header.hasExceptionClassName() ? header.getExceptionClassName() : "ServerDidNotSetExceptionClassName"; final String errorMsg = header.hasErrorMsg() ? header.getErrorMsg() : "ServerDidNotSetErrorMsg" ; final RpcErrorCodeProto erCode = (header.hasErrorDetail() ? header.getErrorDetail() : null); if (erCode == null) { LOG.warn("Detailed error code not set by server on rpc error"); } RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode); if (status == RpcStatusProto.ERROR) { final Call call = calls.remove(callId); call.setException(re); } else if (status == RpcStatusProto.FATAL) { // Close the connection markClosed(re); } } } catch (IOException e) { markClosed(e); } } ``` 这里我们看到了对于异步获取返回值的流程,而他是将结果放到了call对象中,而对于具体使用,我们返回值主线程继续看。 ## 3.3 getRpcResponse 前面我们分析到对于结果已经接收完成,并存放到了call对象中,而怎么将结果取出,我们看一下 ```java private Writable getRpcResponse(final Call call, final Connection connection, final long timeout, final TimeUnit unit) throws IOException { synchronized (call) { //循环,等待call对象处理请求结果完成 while (!call.done) { try { AsyncGet.Util.wait(call, timeout, unit); if (timeout >= 0 && !call.done) { return null; } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new InterruptedIOException("Call interrupted"); } } //判断结果是否有异常,并取出返回值 if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // local exception InetSocketAddress address = connection.getRemoteAddress(); throw NetUtils.wrapException(address.getHostName(), address.getPort(), NetUtils.getHostname(), 0, call.error); } } else { return call.getRpcResponse(); } } } ```
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号