交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第4 章:Protocol
分享
未结
0
932
李延
LV6
2021-08-02
悬赏:20积分
# 作用 协议的抽象,对不同的通信协议进行了封装,比如HTTP协议和AJP协议 前面我们看到创建的Http11NioProtocol ,所以也以分析它为主 # 构造方法 ```java public Http11NioProtocol() { super(new NioEndpoint()); } ``` 这里我们看到创建了NioEndpoint,跟进到其父类 ```java public AbstractHttp11Protocol(AbstractEndpoint<S,?> endpoint) { super(endpoint); setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); ConnectionHandler<S> cHandler = new ConnectionHandler<>(this); setHandler(cHandler); getEndpoint().setHandler(cHandler); } ``` 在这里我们又看到创建了ConnectionHandler对象 # init与start ```java @Override public void init() throws Exception { //......... String endpointName = getName(); endpoint.setName(endpointName.substring(1, endpointName.length()-1)); endpoint.setDomain(domain); endpoint.init(); } @Override public void start() throws Exception { if (getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.start", getName())); logPortOffset(); } endpoint.start(); monitorFuture = getUtilityExecutor().scheduleWithFixedDelay( new Runnable() { @Override public void run() { if (!isPaused()) { startAsyncTimeout(); } } }, 0, 60, TimeUnit.SECONDS); } ``` 我们看到protocol的init和start就是对于endpoint的init和start # endpoint  在前面我们看到其创建的NioEndpoint对象,我们看一下它的一些方法 ## init ```java private void bindWithCleanup() throws Exception { try { bind(); } catch (Throwable t) { // Ensure open sockets etc. are cleaned up if something goes // wrong during bind ExceptionUtils.handleThrowable(t); unbind(); throw t; } } public final void init() throws Exception { if (bindOnInit) { bindWithCleanup(); bindState = BindState.BOUND_ON_INIT; } //········ } ``` 这里主要调用了bind方法 ```java @Override public void bind() throws Exception { initServerSocket(); //········· } // Separated out to make it easier for folks that extend NioEndpoint to // implement custom [server]sockets protected void initServerSocket() throws Exception { if (!getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); serverSock.socket().bind(addr,getAcceptCount()); } else { // Retrieve the channel provided by the OS Channel ic = System.inheritedChannel(); if (ic instanceof ServerSocketChannel) { serverSock = (ServerSocketChannel) ic; } if (serverSock == null) { throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited")); } } serverSock.configureBlocking(true); //mimic APR behavior } ``` 最终我们在这里看到tomcat打开我们需要监听的端口并创建了一个serverSock用于接收请求。并且它是以阻塞的模式进行启动的。 ## start ```java @Override public void startInternal() throws Exception { if (!running) { running = true; paused = false; //创建栈结构,在tomcat中的对象如果使用完不会回收,而是放到这里,如果下次再需要使用,就直接从这里拿,不好创建新的对象 if (socketProperties.getProcessorCache() != 0) { processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); } if (socketProperties.getEventCache() != 0) { eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); } if (socketProperties.getBufferPool() != 0) { nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); } // Create worker collection //创建线程用户接收请求 if (getExecutor() == null) { createExecutor(); } initializeConnectionLatch(); // Start poller thread //创建poller。并开启一个新的线程执行 poller = new Poller(); Thread pollerThread = new Thread(poller, getName() + "-ClientPoller"); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); //创建acceptor,并开启一个新的线程执行 startAcceptorThread(); } } ``` 在start中我们看到分别创建了poller和acceptor。并且在一个新的线程中去执行 # acceptor ```java @Override public void run() { int errorDelay = 0; try { // Loop until we receive a shutdown command //判断服务是否停止 while (!stopCalled) { // Loop if endpoint is paused //判断endpoint是否暂停 while (endpoint.isPaused() && !stopCalled) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } //如果停止就退出循环 if (stopCalled) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait //判断当前接收请求的接受请求是否到达上限 endpoint.countUpOrAwaitConnection(); // Endpoint might have been paused while waiting for latch // If that is the case, don't accept new connections //如果endpoint已经暂停就continue if (endpoint.isPaused()) { continue; } U socket = null; try { // Accept the next incoming connection from the server // socket //接收tcp请求。这里就是调用在上一步在init是创建的serverSock。以阻塞的方式接收请求 socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { // We didn't get a socket endpoint.countDownConnection(); if (endpoint.isRunning()) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket //当有新的请求时,先判断服务是否正常运行,否则断开请求 if (!stopCalled && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful //将新接收到的socket保存在poller的事件队列中 if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); String msg = sm.getString("endpoint.accept.fail"); // APR specific. // Could push this down but not sure it is worth the trouble. if (t instanceof Error) { Error e = (Error) t; if (e.getError() == 233) { // Not an error on HP-UX so log as a warning // so it can be filtered out on that platform // See bug 50273 log.warn(msg, t); } else { log.error(msg, t); } } else { log.error(msg, t); } } } } finally { stopLatch.countDown(); } state = AcceptorState.ENDED; } ``` 我们看到这里主要就是一个一直循环事件,去判断我们之前建立的tcp连接中是否有新的请求过来,如果有,就会将我们的socker对象添加到poller的队列中 # poller ```java @Override public void run() { // Loop until destroy() is called //事件运行 while (true) { boolean hasEvents = false; try { //判断是否已经关闭 if (!close) { //处理事件,在acceptor中我们将tcp连接添加在了队列中。 //而这里我们将其socker的read事件,添加到了selector中,等待数据传输 hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { // If we are here, means we have other stuff to do // Do a non blocking select //判断当前是否有read事件 keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error(sm.getString("endpoint.nio.selectorLoopError"), x); continue; } // Either we timed out or we woke up, process events first if (keyCount == 0) { hasEvents = (hasEvents | events()); } //对于有read事件时,获取其迭代器 Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (socketWrapper == null) { iterator.remove(); } else { iterator.remove(); //处理当前read事件 processKey(sk, socketWrapper); } } // Process timeouts timeout(keyCount,hasEvents); } getStopLatch().countDown(); } ``` 在这里我们看到poller中,去处理了之前acceptor中添加到队列的tcp连接。并且当有read事件时,调用了processKeyfangf ```java public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } SocketProcessorBase<S> sc = null; if (processorCache != null) { sc = processorCache.pop(); } if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; } ``` 这里我们看到将其分装在了SocketProcessorBase对象中,并通过线程池,开启新的线程去处理方法。 # 总结 Protocol中。我们可以看到endpoint组件是对于数据的接收,其中acceptor是对于tcp连接的建立,当我们接受到一个tcp连接时,将它交给poller来接收到读取文件的事件,并在次时通过线程池,为每个请求创建一个线程来处理数据。 最后通过pressor将接收到的数据解析为request和repost交给CoyoteAdapter来处理。
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号