交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
netty 第1 章:EventLoop与EventLoopGroup
分享
未结
0
1087
李延
LV6
2021-11-19
悬赏:20积分
# 1 背景 EventLoopGroup和EvenLoop是netty的重要组件,EventLoopGroup相当于一个线程池,在netty中所有的io事件与异步事件都是在EventLoopGroup中执行的,而一个EventLoopGroup管理着多个EventLoop,其中一个EventLoop就相当于一个线程,它就是一个正在执行某一个事件的线程。所以下面我们详细解析这两个接口 # 2 继承关系 ## 2.1 EventLoopGroup继承关系  ## 2.2 EventLoopGroup 接口说明 Executor 主要是对于提交任务的一些定义包括: ```java void execute(Runnable command); ``` 提交一个任务给 EventLoopGroup EventLoopGroup 主要是一些类似线程池的功能比如: ```java void shutdown(); boolean isShutdown(); <T> Future<T> submit(Callable<T> task); ``` ScheduledExecutorService 主要是一些调度任务的定义,比如: ```java public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); ``` 在指定时间后执行任务 EventExecutorGroup 基本于上面接口相同 EventLoopGroup 主要是对于io事件的定义 ```java @Override EventLoop next(); ``` 获取下一个EventLoop ```java /** * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture} * will get notified once the registration was complete. */ ChannelFuture register(Channel channel); /** * Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed * {@link ChannelFuture} will get notified once the registration was complete and also will get returned. */ ChannelFuture register(ChannelPromise promise); /** * Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture} * will get notified once the registration was complete and also will get returned. * * @deprecated Use {@link #register(ChannelPromise)} instead. */ @Deprecated ChannelFuture register(Channel channel, ChannelPromise promise); ``` register 注册感兴趣的io事件 我们通过上面的分析看到,除了有线程池的定义外,就是register方法,注册io事件这两类方法定义。 ## 2.3 EventLoop  很明显EventLoop继承自EventLoopGroup。也就是说它有这和EventLoopGroup一样的方法,毕竟EventLoopGroup相当于一个集合,而EventLoop相当于其中一个元素。 # 3 EventLoopGroup 功能实现 这里我们主要通过NioEventLoopGroup 为实现分析其具体实现。  ## 3.1 AbstractEventExecutorGroup EventExecutorGroup的抽象实现。我们看一下代码发现其关系提交任务的实现都是通过调用`next()`然后由具体的EventExecutor来实现的比如: ```java @Override public Future<?> submit(Runnable task) { return next().submit(task); } @Override public <T> Future<T> submit(Runnable task, T result) { return next().submit(task, result); } @Override public <T> Future<T> submit(Callable<T> task) { return next().submit(task); } @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { return next().schedule(command, delay, unit); } ``` ## 3.2 MultithreadEventExecutorGroup 多线程 事件执行组, 这里主要的代码逻辑在构造函数中,其他主要是为了实现上面说的`next()`方法。 ```java protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { checkPositive(nThreads, "nThreads"); if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //根据参数初始化指定数量的EventExecutor children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //创建EventExecutor,具体由子类实现 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { //如果失败,进行关闭 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } //创建选择器 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); } @Override public EventExecutor next() { return chooser.next(); } ``` 这里代码我们看到首先是创建了指定数量的EventExecutor,而`next()` 方法是由选择器chooser确定的。所以我们看一下chooser是如何执行的 ```java private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } } private static final class GenericEventExecutorChooser implements EventExecutorChooser { // Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary. // The 64-bit long solves this by placing the overflow so far into the future, that no system // will encounter this in practice. private final AtomicLong idx = new AtomicLong(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)]; } } ``` 这里我们看到其实就是对于数组的一个轮训取值,也就是说:EventExecutor对于任务的执行就是挨着轮训执行任务。 ## 3.3 MultithreadEventLoopGroup 主要有两个实现,一个是默认情况先EventExecutor的数量,另一个是register实现。 ```java static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } ``` 我们看到默认情况下是cpu核心数量的2倍 而register与任务的执行也是一样的逻辑 ```java static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } ``` ## 3.4 NioEventLoopGroup NioEventLoopGroup主要是对于newChild的实现。这里创建的对象是NioEventLoop ```java @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { SelectorProvider selectorProvider = (SelectorProvider) args[0]; SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1]; RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2]; EventLoopTaskQueueFactory taskQueueFactory = null; EventLoopTaskQueueFactory tailTaskQueueFactory = null; int argsLength = args.length; if (argsLength > 3) { taskQueueFactory = (EventLoopTaskQueueFactory) args[3]; } if (argsLength > 4) { tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4]; } return new NioEventLoop(this, executor, selectorProvider, selectStrategyFactory.newSelectStrategy(), rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory); } ``` ## 3.5 总结 NioEventLoopGroup 的功能就是相当于一个线程池,接受到任务和io事件后,交给具体的某个EventLoop去执行,而EventLoop的选择,则是通过choose来确定的。默认情况下是轮训的策略。在NioEventLoopGroup的实现中,创建的就是NioEventLoop对象。 # 4 NioEventLoop 功能解析  这里我们主要分析AbstractScheduledEventExecutor、SingleThreadEventExecutor、SingleThreadEventLoop、NioEventLoop 这3个实现进行解析 ## 4.1 AbstractScheduledEventExecutor 这个是对于延迟任务的实现,所有的延迟任务都保存在一个队列中: ```java PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue; ``` 获取最近一次的延迟任务: ```java final ScheduledFutureTask<?> peekScheduledTask() { Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null; } ``` 获取最近事件的延迟任务的时间: ```java final ScheduledFutureTask<?> peekScheduledTask() { Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null; } ``` 获取指定时间内的最近一次延迟任务: ```java protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) { return null; } scheduledTaskQueue.remove(); scheduledTask.setConsumed(); return scheduledTask; } ``` 我们看到这个类中主要是对于延迟任务的提交于获取,而没有起具体的执行方法。 ## 4.2 SingleThreadEventExecutor 单线程任务执行器,一个线程执行任务。 我们看到提交的任务都保存在`private final Queue<Runnable> taskQueue;`队列中。 我们看一下任务提交过程: ```java @Override public void execute(Runnable task) { ObjectUtil.checkNotNull(task, "task"); execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); } private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } } ``` 我们看到一方面对于任务提交会添加之前提到的队列中,而如果线程没有启动会启动线程。并且在最后有一个wakeup唤醒线程的操作。我们在后面说明。我们先看一下startThread。 ```java private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } } ``` 我们看到线程的启动只会执行一次,在未启动的状态下执行。 ```java private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { //................. } } }); } ``` 这里我们看到通过executor新启动一个线程去执行我们的任务,而具体的任务交给子类 SingleThreadEventExecutor.this.run();去实现。 ## 4.3 SingleThreadEventLoop 暂时没发现重要逻辑,忽略 ## 4.4 NioEventLoop 这里主要关注两个方法一个是selecor ,另一个就是之前启动的run方法。 在NioEventLoop中,每一个对象,都维护着一个selecor。而在run 方法中就监听这这个selecor监听到的事情。在构造函数中,我们可以看到selecor的创建过程: ```java NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) { super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; } ``` 下面我们主要分析一下run方法 ```java @Override protected void run() { int selectCnt = 0; for (;;) { try { int strategy; try { // 如果有等待的任务,则返回监听到的io时间数量,如果没有等待任务,则返回SelectStrategy.SELECT strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: //判断最近一次的延迟任务 long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { //再次判断是否有任务 if (!hasTasks()) { //如果有延迟任务,则select等待至延迟任务开始,如果没有延迟任务就一直阻塞等待。 strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; } //有io事件,或者任务需要执行时,会执行到这一步 selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { try { //先执行io事件 if (strategy > 0) { processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. //执行普通任务 final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { ranTasks = runAllTasks(0); // This will run the minimum number of tasks } if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } } catch (CancelledKeyException e) { // Harmless exception - log anyway if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } finally { // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } } } } ``` ## 4.5 processSelectedKey 我们上面看到最后 io事件是在这个方法中执行的: ```java private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //........ if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } ``` 而对于OP_READ或者OP_ACCEPT 是通过 不同的AbstractNioChannel 的内部类NioUnsafe来执行的。我们后续具体分析
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号