交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第5 章:网络ServerCnxnFactory
分享
未结
0
934
李延
LV6
2021-09-10
悬赏:20积分
# 1. 背景 在zookeeper中的网络是通过tcp与客户端进行交互的,而所有的网络请求都是通过ServerCnxnFactory来管理,其中主要涉ServerCnxnFactory与ServerCnxn 两个组件。其具体作用为: - ServerCnxnFactory 监听端口。当有客户端时,与客户端连接连接。并创建与当前客户端的连接,交给ServerCnxn来处理。 - 由ServerCnxn创建,一个ServerCnxn代表一个与某个客户端具体的连接。处理其具体的网络交互。 # 2. 实现 在zookeeper中有两个不同的实现 原生的NIO和netty实现,默认使用的时nio实现,这里我们只具体分析nio的实现。 # 3. NIOServerCnxnFactory 在上面的说明我们指定NIOServerCnxnFactory主要作用是用于与客户端建立请求,那么他应该主要涉及一下几点事情要完成: - 建立监听端口 - 监听连接事件,创建与客户端的连 - 接收客户端接发送的数据,交给ServerCnxn处理请求 额外的还有监听发送数据事件,与关闭超时的连接。 对于这几个步骤在NIOServerCnxnFactory中共有一下几个组件 - AcceptThread 该线程接收来自客户端的连接。 - SelectorThread 处理所有的读写事件。 - WorkerService 工作线程,SelectorThread接收到的读写事件交给WorkerService来处理 - ConnectionExpirerThread 处理连接超时情况 上面的4个组件分别在各种线程中进行工作,根据不同需要,各种开启一个或者多个线程进行数据处理。而且他们之间的通信都是通过队列来完成,一个完整的流程如下: AcceptThread 监听OP_ACCEPT事件,当有客户端建立连接时,AcceptThread监听到事件,同时将事件添加到 SelectorThread的队列中。 SelectorThread当检查到队列中有数据时,创建与客户端的socket连接,并创建ServerCnxn对象,同时监听read事件。当接收到数据后读取将请求交给WorkerService处理请求。 ConnectionExpirerThread 是一个独立的线程,不断监听所有的连接,如果发现有连接超时,则关闭当前连接。 ## 3.1 初始化 在创建完对象后通过configure方法配置需要监听的方法 ```JAVA /** * * * @param addr 服务器绑定地址 * @param maxcc 客户端最大连接数 * @param backlog TCP 服务端用于临时存放已完成三次握手的请求的队列的最大长度 ,-1没有限制 * @param secure * @throws IOException */ @Override public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException { if (secure) { throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn"); } //sasl认证 configureSaslLogin(); maxClientCnxns = maxcc; initMaxCnxns(); sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000); // We also use the sessionlessCnxnTimeout as expiring interval for // cnxnExpiryQueue. These don't need to be the same, but the expiring // interval passed into the ExpiryQueue() constructor below should be // less than or equal to the timeout. //检查连接超时的队列 并创建对于的线程 cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout); expirerThread = new ConnectionExpirerThread(); int numCores = Runtime.getRuntime().availableProcessors(); // 32 cores sweet spot seems to be 4 selector threads numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores / 2), 1)); if (numSelectorThreads < 1) { throw new IOException("numSelectorThreads must be at least 1"); } numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores); workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000); String logMsg = "Configuring NIO connection handler with " + (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, " + numSelectorThreads + " selector thread(s), " + (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and " + (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers.")); LOG.info(logMsg); //创建selector线程,也就是接收请求的线程。需要的数量根据cpu核数确定 for (int i = 0; i < numSelectorThreads; ++i) { selectorThreads.add(new SelectorThread(i)); } //开启监听端口 listenBacklog = backlog; this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port {}", addr); if (listenBacklog == -1) { ss.socket().bind(addr); } else { ss.socket().bind(addr, listenBacklog); } ss.configureBlocking(false); //创建acceptThread对象 acceptThread = new AcceptThread(ss, addr, selectorThreads); } ``` 在这里我们看到了 对于监听端口的绑定和我们之前分析的几个组件的创建。下面我们详细分析这几个组件的代码 ## 3.2 AcceptThread ```JAVA public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException { super("NIOServerCxnFactory.AcceptThread:" + addr); this.acceptSocket = ss; //监听OP_ACCEPT事件 this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT); //将selector转换为一个不断循环的迭代器,这样就可以实现不同线程的轮询 this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads)); selectorIterator = this.selectorThreads.iterator(); } public void run() { try { //不断循环调用select while (!stopped && !acceptSocket.socket().isClosed()) { try { select(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } } finally { closeSelector(); // This will wake up the selector threads, and tell the // worker thread pool to begin shutdown. if (!reconfiguring) { NIOServerCnxnFactory.this.stop(); } LOG.info("accept thread exitted run method"); } } public void setReconfiguring() { reconfiguring = true; } private void select() { try { // 执行select等待客户端连接 selector.select(); //获取到连接 Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { //获取SocketChannel对象,并将其添加到selector的队列中 if (!doAccept()) { // If unable to pull a new connection off the accept // queue, pause accepting to give us time to free // up file descriptors and so the accept thread // doesn't spin in a tight loop. pauseAccept(10); } } else { LOG.warn("Unexpected ops in accept select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } ``` ## 3.3 SelectorThread ```JAVA public void run() { try { while (!stopped) { try { //调用select,获取read事件, select(); //将队列中的数据注册read 事件,等待读取数据 processAcceptedConnections(); processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } // Close connections still pending on the selector. Any others // with in-flight work, let drain out of the work queue. for (SelectionKey key : selector.keys()) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); } cleanupSelectionKey(key); } SocketChannel accepted; while ((accepted = acceptedQueue.poll()) != null) { fastCloseSock(accepted); } updateQueue.clear(); } finally { closeSelector(); // This will wake up the accept thread and the other selector // threads, and tell the worker thread pool to begin shutdown. NIOServerCnxnFactory.this.stop(); LOG.info("selector thread exitted run method"); } } ``` ### 3.3.1 processAcceptedConnections  这里我看到创建了NIOServerCnxn的对象。并且注册了OP_READ事件 ### 3.3.2 select    对于请求我们看到最终是通过workerPool开启新的线程进行处理,具体的处理逻辑还是通过NIOServerCnxn的doIO方法进行 # 4 NIOServerCnxn 前面我们分析到请求是交给了NIOServerCnxn的doIO方法。这里我们就从这个方法看起 ```java void doIO(SelectionKey k) throws InterruptedException { try { if (!isSocketOpen()) { LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId)); return; } //读数据事件 if (k.isReadable()) { //incomingBuffer 长度为4 ,读取前4个字节。读取到当前请求有多数字节 int rc = sock.read(incomingBuffer); if (rc < 0) { //根据上面读取到字节,重新创建incomingBuffer长度。 handleFailedRead(); } //如果为0 说明前面后面没有字节 if (incomingBuffer.remaining() == 0) { boolean isPayload; if (incomingBuffer == lenBuffer) { // start of next request incomingBuffer.flip(); //检查字节长度,如果字节长度大于规定最大长度,报错 isPayload = readLength(k); incomingBuffer.clear(); } else { // continuation isPayload = true; } if (isPayload) { // not the case for 4letterword //读取真正的数据内容 readPayload(); } else { // four letter words take care // need not do anything else return; } } } //写数据事件 if (k.isWritable()) { handleWrite(k); if (!initialized && !getReadInterest() && !getWriteInterest()) { throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE); } } } catch (CancelledKeyException e) { LOG.warn("CancelledKeyException causing close of session: 0x{}", Long.toHexString(sessionId)); LOG.debug("CancelledKeyException stack trace", e); close(DisconnectReason.CANCELLED_KEY_EXCEPTION); } catch (CloseRequestException e) { // expecting close to log session closure close(); } catch (EndOfStreamException e) { LOG.warn("Unexpected exception", e); // expecting close to log session closure close(e.getReason()); } catch (ClientCnxnLimitException e) { // Common case exception, print at debug level ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); LOG.warn("Closing session 0x{}", Long.toHexString(sessionId), e); close(DisconnectReason.CLIENT_CNX_LIMIT); } catch (IOException e) { LOG.warn("Close of session 0x{}", Long.toHexString(sessionId), e); close(DisconnectReason.IO_EXCEPTION); } } ``` 我们看到在接收到的数据中,就开始的4位是描述当前内容长度的对于非法的直接排除,之后交给readPayload ```java /** Read the request payload (everything following the length prefix) */ private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { handleFailedRead(); } } if (incomingBuffer.remaining() == 0) { // have we read length bytes? incomingBuffer.flip(); packetReceived(4 + incomingBuffer.remaining()); //处理请求 if (!initialized) { readConnectRequest(); } else { readRequest(); } lenBuffer.clear(); incomingBuffer = lenBuffer; } } protected void readRequest() throws IOException { zkServer.processPacket(this, incomingBuffer); } ``` 这里我们看到它将数据接收到一个incomingBuffer中,并通过readConnectRequest和readRequest处理请求 而具体的过程交给zkServer来处理
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号