交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第6-1 章:服务端-单机-ZooKeeperServer
分享
未结
0
964
李延
LV6
2021-09-10
悬赏:20积分
# 1. 背景 在前面我们分析网络是看到接收到一个请求后,请求是交给了ZooKeeperServer的processPacket来处理,也就是说,单机模式下,请求的处理过程就是它来完成的 # 2. 请求责任链 在接收到请求后,请求会被交给RequestProcessor责任链来处理,在服务启动的时候我们看到了责任链的初始化 ```java protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor) syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor) firstProcessor).start(); ``` 我们看到3个责任链关系分别为PrepRequestProcessor ->SyncRequestProcessor ->FinalRequestProcessor ## 2.1 请求链前置调用 我们知道请求直接调用的是processPacket,我们来看一下它是怎么调用责任链的 ```java public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { // We have the request, now process and setup for next //将byte封装为流对象 InputStream bais = new ByteBufferInputStream(incomingBuffer); //封装为InputArchive 对象 BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); //创建RequestHeader对象 RequestHeader h = new RequestHeader(); //序列化,读取前2个字节 分别为 //xid 当前操作编号 //type 当前操作类型 h.deserialize(bia, "header"); // Need to increase the outstanding request count first, otherwise // there might be a race condition that it enabled recv after // processing request and then disabled when check throttling. // // Be aware that we're actually checking the global outstanding // request before this request. // // It's fine if the IOException thrown before we decrease the count // in cnxn, since it will close the cnxn anyway. cnxn.incrOutstandingAndCheckThrottle(h); // Through the magic of byte buffers, txn will not be // pointing // to the start of the txn incomingBuffer = incomingBuffer.slice(); //授权请求 if (h.getType() == OpCode.auth) { LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); AuthPacket authPacket = new AuthPacket(); ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); String scheme = authPacket.getScheme(); ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); Code authReturn = KeeperException.Code.AUTHFAILED; if (ap != null) { try { // handleAuthentication may close the connection, to allow the client to choose // a different server to connect to. authReturn = ap.handleAuthentication( new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth()); } catch (RuntimeException e) { LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e); authReturn = KeeperException.Code.AUTHFAILED; } } if (authReturn == KeeperException.Code.OK) { LOG.info("Session 0x{}: auth success for scheme {} and address {}", Long.toHexString(cnxn.getSessionId()), scheme, cnxn.getRemoteSocketAddress()); ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh, null, null); } else { if (ap == null) { LOG.warn( "No authentication provider for scheme: {} has {}", scheme, ProviderRegistry.listProviders()); } else { LOG.warn("Authentication failed for scheme: {}", scheme); } // send a response... ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); cnxn.sendResponse(rh, null, null); // ... and close connection cnxn.sendBuffer(ServerCnxnFactory.closeConn); cnxn.disableRecv(); } return; //SASL授权 } else if (h.getType() == OpCode.sasl) { processSasl(incomingBuffer, cnxn, h); } else { //判断是否登录 if (!authHelper.enforceAuthentication(cnxn, h.getXid())) { // Authentication enforcement is failed // Already sent response to user about failure and closed the session, lets return return; } else { Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); int length = incomingBuffer.limit(); if (isLargeRequest(length)) { // checkRequestSize will throw IOException if request is rejected checkRequestSizeWhenMessageReceived(length); si.setLargeRequestSize(length); } si.setOwner(ServerCnxn.me); //TODO 提交请求 submitRequest(si); } } } public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { // We have the request, now process and setup for next //将byte封装为流对象 InputStream bais = new ByteBufferInputStream(incomingBuffer); //封装为InputArchive 对象 BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); //创建RequestHeader对象 RequestHeader h = new RequestHeader(); //序列化,读取前2个字节 分别为 //xid 当前操作编号 //type 当前操作类型 h.deserialize(bia, "header"); // Need to increase the outstanding request count first, otherwise // there might be a race condition that it enabled recv after // processing request and then disabled when check throttling. // // Be aware that we're actually checking the global outstanding // request before this request. // // It's fine if the IOException thrown before we decrease the count // in cnxn, since it will close the cnxn anyway. cnxn.incrOutstandingAndCheckThrottle(h); // Through the magic of byte buffers, txn will not be // pointing // to the start of the txn incomingBuffer = incomingBuffer.slice(); //授权请求 if (h.getType() == OpCode.auth) { LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); AuthPacket authPacket = new AuthPacket(); ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); String scheme = authPacket.getScheme(); ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); Code authReturn = KeeperException.Code.AUTHFAILED; if (ap != null) { try { // handleAuthentication may close the connection, to allow the client to choose // a different server to connect to. authReturn = ap.handleAuthentication( new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth()); } catch (RuntimeException e) { LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e); authReturn = KeeperException.Code.AUTHFAILED; } } if (authReturn == KeeperException.Code.OK) { LOG.info("Session 0x{}: auth success for scheme {} and address {}", Long.toHexString(cnxn.getSessionId()), scheme, cnxn.getRemoteSocketAddress()); ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh, null, null); } else { if (ap == null) { LOG.warn( "No authentication provider for scheme: {} has {}", scheme, ProviderRegistry.listProviders()); } else { LOG.warn("Authentication failed for scheme: {}", scheme); } // send a response... ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); cnxn.sendResponse(rh, null, null); // ... and close connection cnxn.sendBuffer(ServerCnxnFactory.closeConn); cnxn.disableRecv(); } return; //SASL授权 } else if (h.getType() == OpCode.sasl) { processSasl(incomingBuffer, cnxn, h); } else { //判断是否登录 if (!authHelper.enforceAuthentication(cnxn, h.getXid())) { // Authentication enforcement is failed // Already sent response to user about failure and closed the session, lets return return; } else { Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); int length = incomingBuffer.limit(); if (isLargeRequest(length)) { // checkRequestSize will throw IOException if request is rejected checkRequestSizeWhenMessageReceived(length); si.setLargeRequestSize(length); } si.setOwner(ServerCnxn.me); //TODO 提交请求 submitRequest(si); } } } ``` 我们将请求内容封装为Request对象,并调用了submitRequest方法 ```java /** * 将请求提交给{@linkplain RequestThrottler} 处理 * 其中 我们是将它提交给RequestThrottler的队列等待它的run方法异步处理 * 这个线程是在{@linkplain ZooKeeperServer#startupWithoutServing()} 中初始化的 * * @param si */ public void enqueueRequest(Request si) { if (requestThrottler == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (requestThrottler == null) { throw new RuntimeException("Not started"); } } } requestThrottler.submitRequest(si); } ``` 我们看到请求提交给了requestThrottler对象,进行限流统一处理。而在requestThrottler在单独的线程中,进行流量判断。最后对合法的请求提交给zks  而在这个方法中。我们将请求提交给了责任链的第一个。  在责任链的线程中完成请求
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号