交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第4-1 章 hadoop-RPC 源码解析
分享
未结
1
1287
李延
LV6
2022-03-06
悬赏:20积分
# 1 服务端 ## 1.1 build 创建 我们先看一下服务端的代码 ```java final RPC.Server localhost = new RPC.Builder(new Configuration()) .setBindAddress("localhost") .setPort(8080) .setProtocol(RPCProtocolTest.class) .setInstance(new RPCProtocolTestImpl()) .build(); localhost.start(); ``` 我们看到最后使用的是`RPC.Server`对象,我们通过build方法具体看一下其创建过程 ```java public Server build() throws IOException, HadoopIllegalArgumentException { if (this.conf == null) { throw new HadoopIllegalArgumentException("conf is not set"); } if (this.protocol == null) { throw new HadoopIllegalArgumentException("protocol is not set"); } if (this.instance == null) { throw new HadoopIllegalArgumentException("instance is not set"); } return getProtocolEngine(this.protocol, this.conf).getServer( this.protocol, this.instance, this.bindAddress, this.port, this.numHandlers, this.numReaders, this.queueSizePerHandler, this.verbose, this.conf, this.secretManager, this.portRangeConfig, this.alignmentContext); } // return the RpcEngine configured to handle a protocol static synchronized RpcEngine getProtocolEngine(Class<?> protocol, Configuration conf) { RpcEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), WritableRpcEngine.class); engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); PROTOCOL_ENGINES.put(protocol, engine); } return engine; } ``` 我们看到首先通过getProtocolEngine方法获取到RpcEngine对象,而在默认情况下,创建的是`WritableRpcEngine`对象。所以我们看一下他的getServer方法 ```java @Override public RPC.Server getServer(Class<?> protocolClass, Object protocolImpl, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig, AlignmentContext alignmentContext) throws IOException { return new Server(protocolClass, protocolImpl, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, portRangeConfig, alignmentContext); } ``` 这里我们可以直接看到通过new出来的Server ,而这个Server是WritableRpcEngine的一个子类,下面我们主要分析一下这个类。 ## 1.2 RPC.Server WritableRpcEngine.Server 继承自 Rpc.Server 。而Rpc.Server 继承自org.apache.hadoop.ipc.Server。 这3个类各种负责着不同的功能,共同完成RPC的功能。 - org.apache.hadoop.ipc.Server 负责网络功能 - Rpc.Server 注册所有业务 - WritableRpcEngine.Server 具体协议的实现 ## 1.3 org.apache.hadoop.ipc.Server org.apache.hadoop.ipc.Server类负责所以的网络传输功能,包括:端口监听、数据接收、数据发送的功能,而具体的网络协议由子类负责。 ### 1.3.1 各自组件说明 在这个Server中,我们看到有几个子类,他们分别负责不同的功能。 - Listener 负责监督端口,与数据的读取。 - Connection 每个连接封装的对象,对于数据的解析逻辑,就在这个对象中。 - Handler 调用具体业务方法 - RpcCall与 Call 封装方法的调用,handler就是调用这里面的接口。 - Responder 异步处理返回值 - AuthProtocol 用户验证类 - ConnectionManager 连接管理类,定时扫描所有连接,对于长时间没有请求的连接,进行关闭,在listener开始是启动 ### 1.3.2 初始化 我们首先看一下Server的构造函数 ```java protected Server(String bindAddress, int port, Class<? extends Writable> rpcRequestClass, int handlerCount, int numReaders, int queueSizePerHandler, Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig) throws IOException { this.bindAddress = bindAddress; this.conf = conf; this.portRangeConfig = portRangeConfig; this.port = port; this.rpcRequestClass = rpcRequestClass; this.handlerCount = handlerCount; this.socketSendBufferSize = 0; this.serverName = serverName; this.auxiliaryListenerMap = null; this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); if (queueSizePerHandler != -1) { this.maxQueueSize = handlerCount * queueSizePerHandler; } else { this.maxQueueSize = handlerCount * conf.getInt( CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); } this.maxRespSize = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT); if (numReaders != -1) { this.readThreads = numReaders; } else { this.readThreads = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); } this.readerPendingConnectionQueue = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); // Setup appropriate callqueue final String prefix = getQueueClassPrefix(); this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf), getSchedulerClass(prefix, conf), getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf); this.secretManager = (SecretManager<TokenIdentifier>) secretManager; this.authorize = conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); // configure supported authentications this.enabledAuthMethods = getAuthMethods(secretManager, conf); this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods); // Start the listener here and let it bind to the port listener = new Listener(port); // set the server port to the default listener port. this.port = listener.getAddress().getPort(); connectionManager = new ConnectionManager(); this.rpcMetrics = RpcMetrics.create(this, conf); this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port); this.tcpNoDelay = conf.getBoolean( CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT); this.setLogSlowRPC(conf.getBoolean( CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC, CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT)); // Create the responder here responder = new Responder(); if (secretManager != null || UserGroupInformation.isSecurityEnabled()) { SaslRpcServer.init(conf); saslPropsResolver = SaslPropertiesResolver.getInstance(conf); } this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class); this.exceptionsHandler.addTerseLoggingExceptions( HealthCheckFailedException.class); } ``` 这里我们看到分别创建创建了不同的组件,具体作用我们在各自组件中说明 start方法调用 ```java public synchronized void start() { responder.start(); listener.start(); if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) { for (Listener newListener : auxiliaryListenerMap.values()) { newListener.start(); } } handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } } ``` 这里我们看到Server的start就是分别启动responder、listener、handlers、这3个组件。我们依次看这3种不同组件 ### 1.3.3 listener #### 1.3.3.1 初始化 ```java Listener(int port) throws IOException { //创建nio的channel address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); acceptChannel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddr); // Bind the server socket to the local host and port //绑定端口 bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); //Could be an ephemeral port this.listenPort = acceptChannel.socket().getLocalPort(); Thread.currentThread().setName("Listener at " + bindAddress + "/" + this.listenPort); // create a selector; selector= Selector.open(); // 创建 Reader,并通过单独线程启动 readers = new Reader[readThreads]; for (int i = 0; i < readThreads; i++) { Reader reader = new Reader( "Socket Reader #" + (i + 1) + " for port " + port); readers[i] = reader; reader.start(); } // Register accepts on the server socket with the selector. //监听 OP_ACCEPT 事件 acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); this.isOnAuxiliaryPort = false; } ``` 在初始化中我们看到,其过程与之前分析的nio网络框架非常相似,创建出一个channel,并bind端口。同时,我们看到这里还创建了Reader对象,将数据的读取交给了Reader处理,而Reader是单独的多个线程。 #### 1.3.3.2 Run ```java @Override public void run() { LOG.info(Thread.currentThread().getName() + ": starting"); SERVER.set(Server.this); connectionManager.startIdleScan(); //循环监听 while (running) { SelectionKey key = null; try { getSelector().select(); //获取到当前所有的触发事件 Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { //对于连接请求通过doAccept处理 if (key.isAcceptable()) doAccept(key); } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish LOG.warn("Out of Memory in server select", e); closeCurrentConnection(key, e); connectionManager.closeIdle(true); try { Thread.sleep(60000); } catch (Exception ie) {} } catch (Exception e) { closeCurrentConnection(key, e); } } LOG.info("Stopping " + Thread.currentThread().getName()); synchronized (this) { try { acceptChannel.close(); selector.close(); } catch (IOException e) { } selector= null; acceptChannel= null; // close all connections connectionManager.stopIdleScan(); connectionManager.closeAll(); } } ``` 在run方法中处理十分简单,就是监听连接,如果有新的连接进来,交给doAccept处理 ```java void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError { //获取当前连接的SocketChannel ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); channel.socket().setKeepAlive(true); Reader reader = getReader(); //创建Connection对象, Connection c = connectionManager.register(channel, this.listenPort, this.isOnAuxiliaryPort); // If the connectionManager can't take it, close the connection. if (c == null) { if (channel.isOpen()) { IOUtils.cleanupWithLogger(LOG, channel); } connectionManager.droppedConnections.getAndIncrement(); continue; } key.attach(c); // so closeCurrentConnection can get the object //将Connection对象叫给reader处理 reader.addConnection(c); } } ``` 在doAccept中我们获取到新连接的SocketChannel对象,同时将它封装为Connection对象,并且将它交给reader方法处理。其中我们需要关注的有: - getReader方法:我们之前创建了多个reader方法,而获取read而的方法就是一个轮询的策略。 - reader.addConnection,具体处理步骤如下: ```java public void addConnection(Connection conn) throws InterruptedException { pendingConnections.put(conn); readSelector.wakeup(); } ``` 我们看到,就是将连接Connection,放到了reader对象里的一个队列中,对于队列元素的获取,在reader的run线程中处理,并且唤醒了reader的run线程。 #### 1.3.3.3 Reader 对象 我们前面分析到Listener对象,建立了连接,并将请求交给Reader对象处理。我们具体来看一下它的run方法: ```java @Override public void run() { LOG.info("Starting " + Thread.currentThread().getName()); try { doRunLoop(); } finally { try { readSelector.close(); } catch (IOException ioe) { LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe); } } } private synchronized void doRunLoop() { while (running) { SelectionKey key = null; try { // consume as many connections as currently queued to avoid // unbridled acceptance of connections that starves the select int size = pendingConnections.size(); for (int i=size; i>0; i--) { //获取队列中元素。 Connection conn = pendingConnections.take(); // 监听read事件 conn.channel.register(readSelector, SelectionKey.OP_READ, conn); } readSelector.select(); //获取到当前selector的reader事件 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { //调用doRead方法,处理read事件 if (key.isReadable()) { doRead(key); } } catch (CancelledKeyException cke) { // something else closed the connection, ex. responder or // the listener doing an idle scan. ignore it and let them // clean up. LOG.info(Thread.currentThread().getName() + ": connection aborted from " + key.attachment()); } key = null; } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e); } } catch (IOException ex) { LOG.error("Error in Reader", ex); } catch (Throwable re) { LOG.error("Bug in read selector!", re); ExitUtil.terminate(1, "Bug in read selector!"); } } } ``` 这个方法也很好理解,就是注册当前连接的read事件,如果有读的事件发生,调用doRead方法 ```java void doRead(SelectionKey key) throws InterruptedException { int count; Connection c = (Connection)key.attachment(); if (c == null) { return; } c.setLastContact(Time.now()); try { //调用readAndProcess方法,读取数据与解析 count = c.readAndProcess(); } catch (InterruptedException ieo) { LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo); throw ieo; } catch (Exception e) { // Any exceptions that reach here are fatal unexpected internal errors // that could not be sent to the client. LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " + c + " threw exception [" + e + "]", e); count = -1; //so that the (count < 0) block is executed } // setupResponse will signal the connection should be closed when a // fatal response is sent. if (count < 0 || c.shouldClose()) { closeConnection(c); c = null; } else { c.setLastContact(Time.now()); } } ``` 这里我们也看到其实对于数据的读取是由Connection对象的readAndProcess方法来处理的。我们在下面章节说明 ### 1.3.4 Connection Connection 就代表一个请求连接,其中包括数据处理的逻辑。我们在上章中就看到数据的读取与处理由readAndProcess 方法进行。 ```java public int readAndProcess() throws IOException, InterruptedException { while (!shouldClose()) { // stop if a fatal response has been sent. // dataLengthBuffer is used to read "hrpc" or the rpc-packet length int count = -1; //读取4个字节。为hrpc。 if (dataLengthBuffer.remaining() > 0) { count = channelRead(channel, dataLengthBuffer); if (count < 0 || dataLengthBuffer.remaining() > 0) return count; } //对于请求头的处理, if (!connectionHeaderRead) { // Every connection is expected to send the header; // so far we read "hrpc" of the connection header. if (connectionHeaderBuf == null) { // for the bytes that follow "hrpc", in the connection header //读取之后的字节,HEADER_LEN_AFTER_HRPC_PART 为3 ,表示读取3个字节 connectionHeaderBuf = ByteBuffer.allocate(HEADER_LEN_AFTER_HRPC_PART); } //读取 count = channelRead(channel, connectionHeaderBuf); if (count < 0 || connectionHeaderBuf.remaining() > 0) { return count; } // 读取到的第一个字节表示版本号,当前为9 int version = connectionHeaderBuf.get(0); // TODO we should add handler for service class later //TODO 第二个字节为0, 作用未知 this.setServiceClass(connectionHeaderBuf.get(1)); dataLengthBuffer.flip(); // Check if it looks like the user is hitting an IPC port // with an HTTP GET - this is a common error, so we can // send back a simple string indicating as much. //判断请求非 GET 。 if (HTTP_GET_BYTES.equals(dataLengthBuffer)) { setupHttpRequestOnIpcPortResponse(); return -1; } // 判断请求头必须为hrpc if(!RpcConstants.HEADER.equals(dataLengthBuffer)) { LOG.warn("Incorrect RPC Header length from {}:{} " + "expected length: {} got length: {}", hostAddress, remotePort, RpcConstants.HEADER, dataLengthBuffer); setupBadVersionResponse(version); return -1; } // 判断版本号是否一致 if (version != CURRENT_VERSION) { //Warning is ok since this is not supposed to happen. LOG.warn("Version mismatch from " + hostAddress + ":" + remotePort + " got version " + version + " expected version " + CURRENT_VERSION); setupBadVersionResponse(version); return -1; } // this may switch us into SIMPLE //获取认证协议,有两种默认或者SASL。 0 位默认 authProtocol = initializeAuthContext(connectionHeaderBuf.get(2)); dataLengthBuffer.clear(); // clear to next read rpc packet len connectionHeaderBuf = null; connectionHeaderRead = true; continue; // connection header read, now read 4 bytes rpc packet len } //内容读取 if (data == null) { // just read 4 bytes - length of RPC packet dataLengthBuffer.flip(); //获取需要读取的长度 dataLength = dataLengthBuffer.getInt(); //判断是否超过最长限制,默认128 * 1024 * 1024 checkDataLength(dataLength); // Set buffer for reading EXACTLY the RPC-packet length and no more. data = ByteBuffer.allocate(dataLength); } // Now read the RPC packet //读取数据 count = channelRead(channel, data); if (data.remaining() == 0) { dataLengthBuffer.clear(); // to read length of future rpc packets data.flip(); ByteBuffer requestData = data; data = null; // null out in case processOneRpc throws. boolean isHeaderRead = connectionContextRead; // 处理请求 processOneRpc(requestData); // the last rpc-request we processed could have simply been the // connectionContext; if so continue to read the first RPC. if (!isHeaderRead) { continue; } } return count; } return -1; } ``` 上面我们看到RPC对于接收数据的解析。格式如下 - 开头固定的hrpc 4个字符 - 之后为3个字符,分别表示:版本号、未知预留、权限认证类型 - 具体的数据,而数据传输,首先用int 表示之后内容长度,并之后读取数据 我们具体跟进channelRead方法看一下对于具体data的处理 ```java private void processOneRpc(ByteBuffer bb) throws IOException, InterruptedException { // exceptions that escape this method are fatal to the connection. // setupResponse will use the rpc status to determine if the connection // should be closed. int callId = -1; int retry = RpcConstants.INVALID_RETRY_COUNT; try { final RpcWritable.Buffer buffer = RpcWritable.Buffer.wrap(bb); final RpcRequestHeaderProto header = getMessage(RpcRequestHeaderProto.getDefaultInstance(), buffer); callId = header.getCallId(); retry = header.getRetryCount(); if (LOG.isDebugEnabled()) { LOG.debug(" got #" + callId); } checkRpcHeaders(header); if (callId < 0) { // callIds typically used during connection setup processRpcOutOfBandRequest(header, buffer); } else if (!connectionContextRead) { throw new FatalRpcServerException( RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "Connection context not established"); } else { processRpcRequest(header, buffer); } } catch (RpcServerException rse) { // inform client of error, but do not rethrow else non-fatal // exceptions will close connection! if (LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + ": processOneRpc from client " + this + " threw exception [" + rse + "]"); } // use the wrapped exception if there is one. Throwable t = (rse.getCause() != null) ? rse.getCause() : rse; final RpcCall call = new RpcCall(this, callId, retry); setupResponse(call, rse.getRpcStatusProto(), rse.getRpcErrorCodeProto(), null, t.getClass().getName(), t.getMessage()); sendResponse(call); } } ``` 我们看到这里首先封装了RpcRequestHeaderProto对象,通过callId 将数据分为两类 - callId 小于0 时,为非业务的请求,包括 权限验证、ping等内容。 - callId 等于0 时,为业务请求,就是我们自己RPC方法的请求。 #### 1.3.4.1 processRpcOutOfBandRequest processRpcOutOfBandRequest处理特殊的请求。具体如下: - PING_CALL_ID ping 请求 - CONNECTION_CONTEXT_CALL_ID 解析请求连接的上下文,及用户信息、请求类信息 ```java private void processRpcOutOfBandRequest(RpcRequestHeaderProto header, RpcWritable.Buffer buffer) throws RpcServerException, IOException, InterruptedException { final int callId = header.getCallId(); if (callId == CONNECTION_CONTEXT_CALL_ID) { // SASL must be established prior to connection context if (authProtocol == AuthProtocol.SASL && !saslContextEstablished) { throw new FatalRpcServerException( RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "Connection header sent during SASL negotiation"); } // read and authorize the user processConnectionContext(buffer); } else if (callId == AuthProtocol.SASL.callId) { // if client was switched to simple, ignore first SASL message if (authProtocol != AuthProtocol.SASL) { throw new FatalRpcServerException( RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "SASL protocol not requested by client"); } saslReadAndProcess(buffer); } else if (callId == PING_CALL_ID) { LOG.debug("Received ping message"); } else { throw new FatalRpcServerException( RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "Unknown out of band call #" + callId); } } ``` 对于ping的处理,只是打印日志,没有具体的处理逻辑。其实它只是在前面更新了最后请求时间,防止长时间没有请求,被关闭。权限校验单独说明 #### 1.3.4.2 processRpcRequest 这里是对于数据的具体处理 ```java private void processRpcRequest(RpcRequestHeaderProto header, RpcWritable.Buffer buffer) throws RpcServerException, InterruptedException { Class<? extends Writable> rpcRequestClass = getRpcRequestWrapper(header.getRpcKind()); if (rpcRequestClass == null) { LOG.warn("Unknown rpc kind " + header.getRpcKind() + " from client " + getHostAddress()); final String err = "Unknown rpc kind in rpc header" + header.getRpcKind(); throw new FatalRpcServerException( RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err); } Writable rpcRequest; try { //Read the rpc request rpcRequest = buffer.newInstance(rpcRequestClass, conf); } catch (RpcServerException rse) { // lets tests inject failures. throw rse; } catch (Throwable t) { // includes runtime exception from newInstance LOG.warn("Unable to read call parameters for client " + getHostAddress() + "on connection protocol " + this.protocolName + " for rpcKind " + header.getRpcKind(), t); String err = "IPC server unable to read call parameters: "+ t.getMessage(); throw new FatalRpcServerException( RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err); } TraceScope traceScope = null; if (header.hasTraceInfo()) { if (tracer != null) { // If the incoming RPC included tracing info, always continue the // trace SpanId parentSpanId = new SpanId( header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()); traceScope = tracer.newScope( RpcClientUtil.toTraceName(rpcRequest.toString()), parentSpanId); traceScope.detach(); } } CallerContext callerContext = null; if (header.hasCallerContext()) { callerContext = new CallerContext.Builder(header.getCallerContext().getContext()) .setSignature(header.getCallerContext().getSignature() .toByteArray()) .build(); } RpcCall call = new RpcCall(this, header.getCallId(), header.getRetryCount(), rpcRequest, ProtoUtil.convert(header.getRpcKind()), header.getClientId().toByteArray(), traceScope, callerContext); // Save the priority level assignment by the scheduler call.setPriorityLevel(callQueue.getPriorityLevel(call)); call.markCallCoordinated(false); if(alignmentContext != null && call.rpcRequest != null && (call.rpcRequest instanceof ProtobufRpcEngine2.RpcProtobufRequest)) { // if call.rpcRequest is not RpcProtobufRequest, will skip the following // step and treat the call as uncoordinated. As currently only certain // ClientProtocol methods request made through RPC protobuf needs to be // coordinated. String methodName; String protoName; ProtobufRpcEngine2.RpcProtobufRequest req = (ProtobufRpcEngine2.RpcProtobufRequest) call.rpcRequest; try { methodName = req.getRequestHeader().getMethodName(); protoName = req.getRequestHeader().getDeclaringClassProtocolName(); if (alignmentContext.isCoordinatedCall(protoName, methodName)) { call.markCallCoordinated(true); long stateId; stateId = alignmentContext.receiveRequestState( header, getMaxIdleTime()); call.setClientStateId(stateId); } } catch (IOException ioe) { throw new RpcServerException("Processing RPC request caught ", ioe); } } try { internalQueueCall(call); } catch (RpcServerException rse) { throw rse; } catch (IOException ioe) { throw new FatalRpcServerException( RpcErrorCodeProto.ERROR_RPC_SERVER, ioe); } incRpcCount(); // Increment the rpc count } ``` 这里主要有两步: - 获取Writable,而具体使用哪个Writable,是其子类的逻辑,并且将参数已经解析到了Writable中 - 封装RpcCall 对象 - 调用internalQueueCall ``` private void internalQueueCall(Call call) throws IOException, InterruptedException { internalQueueCall(call, true); } private void internalQueueCall(Call call, boolean blocking) throws IOException, InterruptedException { try { // queue the call, may be blocked if blocking is true. if (blocking) { callQueue.put(call); } else { callQueue.add(call); } long deltaNanos = Time.monotonicNowNanos() - call.timestampNanos; call.getProcessingDetails().set(Timing.ENQUEUE, deltaNanos, TimeUnit.NANOSECONDS); } catch (CallQueueOverflowException cqe) { // If rpc scheduler indicates back off based on performance degradation // such as response time or rpc queue is full, we will ask the client // to back off by throwing RetriableException. Whether the client will // honor RetriableException and retry depends the client and its policy. // For example, IPC clients using FailoverOnNetworkExceptionRetry handle // RetriableException. rpcMetrics.incrClientBackoff(); // unwrap retriable exception. throw cqe.getCause(); } } ``` 我们看到具体就是将call对象放到callQueue队列中,等待call处理。而callQueue 是Server对象持有的。 到目前为止我们已经完成了数据的解析,我们跟进看一下具体方法的调用 ### 1.3.5 Handler 业务处理组件,前面我们通过Listener 接收与解析请求,并将请求封装到call中,保存在callQueue中,而Handler就是callQueue的消费者。 ```java @Override public void run() { LOG.debug(Thread.currentThread().getName() + ": starting"); SERVER.set(Server.this); while (running) { TraceScope traceScope = null; Call call = null; long startTimeNanos = 0; // True iff the connection for this call has been dropped. // Set to true by default and update to false later if the connection // can be succesfully read. boolean connDropped = true; try { call = callQueue.take(); // pop the queue; maybe blocked here startTimeNanos = Time.monotonicNowNanos(); if (alignmentContext != null && call.isCallCoordinated() && call.getClientStateId() > alignmentContext.getLastSeenStateId()) { /* * The call processing should be postponed until the client call's * state id is aligned (<=) with the server state id. * NOTE: * Inserting the call back to the queue can change the order of call * execution comparing to their original placement into the queue. * This is not a problem, because Hadoop RPC does not have any * constraints on ordering the incoming rpc requests. * In case of Observer, it handles only reads, which are * commutative. */ // Re-queue the call and continue requeueCall(call); continue; } if (LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind); } CurCall.set(call); if (call.traceScope != null) { call.traceScope.reattach(); traceScope = call.traceScope; traceScope.getSpan().addTimelineAnnotation("called"); } // always update the current call context CallerContext.setCurrent(call.callerContext); UserGroupInformation remoteUser = call.getRemoteUser(); connDropped = !call.isOpen(); if (remoteUser != null) { remoteUser.doAs(call); } else { call.run(); } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e); if (traceScope != null) { traceScope.getSpan().addTimelineAnnotation("unexpectedly interrupted: " + StringUtils.stringifyException(e)); } } } catch (Exception e) { LOG.info(Thread.currentThread().getName() + " caught an exception", e); if (traceScope != null) { traceScope.getSpan().addTimelineAnnotation("Exception: " + StringUtils.stringifyException(e)); } } finally { CurCall.set(null); IOUtils.cleanupWithLogger(LOG, traceScope); if (call != null) { updateMetrics(call, startTimeNanos, connDropped); ProcessingDetails.LOG.debug( "Served: [{}]{} name={} user={} details={}", call, (call.isResponseDeferred() ? ", deferred" : ""), call.getDetailedMetricsName(), call.getRemoteUser(), call.getProcessingDetails()); } } } LOG.debug(Thread.currentThread().getName() + ": exiting"); } ``` 对于方法的调用是在 call.run(); 中进行的,我们在子类中分析。而对于结果,就是通过connection直接写到socket中的。 ### 1.3.5 ConnectionManager 它管理着所有请求,对于长时间没有请求的连接,进行关闭。 #### 1.3.5.1 add 添加连接 在Connection 创建后调用。 ``` private boolean add(Connection connection) { boolean added = connections.add(connection); if (added) { count.getAndIncrement(); } return added; } ``` #### 1.3.5.2 startIdleScan 扫描所有连接,对于长时间没有请求的关闭 ```java void startIdleScan() { scheduleIdleScanTask(); } void stopIdleScan() { idleScanTimer.cancel(); } private void scheduleIdleScanTask() { if (!running) { return; } TimerTask idleScanTask = new TimerTask(){ @Override public void run() { if (!running) { return; } if (LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName()+": task running"); } try { closeIdle(false); } finally { // explicitly reschedule so next execution occurs relative // to the end of this scan, not the beginning //调用自己,继续扫描 scheduleIdleScanTask(); } } }; idleScanTimer.schedule(idleScanTask, idleScanInterval); } // synch'ed to avoid explicit invocation upon OOM from colliding with // timer task firing synchronized void closeIdle(boolean scanAll) { long minLastContact = Time.now() - maxIdleTime; // concurrent iterator might miss new connections added // during the iteration, but that's ok because they won't // be idle yet anyway and will be caught on next scan int closed = 0; for (Connection connection : connections) { // stop if connections dropped below threshold unless scanning all if (!scanAll && size() < idleScanThreshold) { break; } // stop if not scanning all and max connections are closed //关闭超时连接 if (connection.isIdle() && connection.getLastContact() < minLastContact && close(connection) && !scanAll && (++closed == maxIdleToClose)) { break; } } } ``` 而这个方法是在Listener启动的时候被调用的。 ## 1.4 Rpc.Server 我们在build Server是,会讲我们的业务实现类注册到这里。我们主要关注registerProtocolAndImpl ```java // Register protocol and its impl for rpc calls void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass, Object protocolImpl) { String protocolName = RPC.getProtocolName(protocolClass); long version; try { version = RPC.getProtocolVersion(protocolClass); } catch (Exception ex) { LOG.warn("Protocol " + protocolClass + " NOT registered as cannot get protocol version "); return; } getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version), new ProtoClassProtoImpl(protocolClass, protocolImpl)); if (LOG.isDebugEnabled()) { LOG.debug("RpcKind = " + rpcKind + " Protocol Name = " + protocolName + " version=" + version + " ProtocolImpl=" + protocolImpl.getClass().getName() + " protocolClass=" + protocolClass.getName()); } String client = SecurityUtil.getClientPrincipal(protocolClass, getConf()); if (client != null) { // notify the server's rpc scheduler that the protocol user has // highest priority. the scheduler should exempt the user from // priority calculations. try { setPriorityLevel(UserGroupInformation.createRemoteUser(client), -1); } catch (Exception ex) { LOG.warn("Failed to set scheduling priority for " + client, ex); } } } ``` 这里我们看到参数中有RpcKind ,表示不同的协议, ```java RPC_BUILTIN ((short) 1), // Used for built in calls by tests RPC_WRITABLE ((short) 2), // Use WritableRpcEngine RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine ``` 而我们默认的就是第二个,最后我们的protocol是被封装为ProtoClassProtoImpl,保存在一个map中。而使用其实在我们上面分析的数据解析部分 ## 1.5 WritableRpcEngine 协议的实现,也就是参数与返回值的解析部分。 # 2 客户端 待定。
回帖
李延
LV6
(楼主)
(社区之光)
2022-03-06
2022年新的开始,加油
0
回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号