交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第2 章:Future与listener
分享
未结
0
921
李延
LV6
2021-11-24
悬赏:20积分
# 1 背景 我们在前面分析到EventLoopGroup就是一个线程池,它可以执行我们提交的任务。而在我们通过submint提交普通任务,或者通过schedule提交延时任务时,会返回给我们一个Future对象,这个对象继承子java自己的Future。其中有一个方法可以添加listener,添加监听器。 当我们的任务执行完成后,就会调用监听器的方法。达到监听任务执行的作用。我们通过下面的例子来看看。 ```java EventLoopGroup eventExecutors = new NioEventLoopGroup(1); final Future<Integer> executor = eventExecutors.submit(() -> { System.out.println("执行普通任务"); return 2; }); executor.addListener(s -> System.out.println("执行普通任务监听器,结果为:" + s.get())); final ScheduledFuture<?> schedule = eventExecutors.schedule(() -> System.out.println("执行延期任务"), 2, TimeUnit.SECONDS); schedule.addListener(s -> System.out.println("执行延期任务监听器")); ``` 结果如下: 执行普通任务 执行普通任务监听器,结果为:2 执行延期任务 执行延期任务监听器 # 2 subnit解析 一直跟进代码我们会发现最后执行的是下面的内容: ```java @Override protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new PromiseTask<T>(this, callable); } ``` 我们看到最后其实创建的是`PromiseTask`对象。 # 3 继承关系  我们看到它继承了Runnable方法,也就是我我们任务其实执行的是PromiseTask的run方法,而我们自己正在的run 方法,是在PromiseTask里的run中别调用的。 # 4 主要接口说明Promise 在Promise中我们就看到了,一些关于监听器的定义 ```java @Override Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); ``` # 5 主要方法实现 ## 5.1 DefaultPromise DefaultPromise 主要是对于listener的一些实现 ```java @Override public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { checkNotNull(listeners, "listeners"); //添加监听器 synchronized (this) { for (GenericFutureListener<? extends Future<? super V>> listener : listeners) { if (listener == null) { break; } addListener0(listener); } } //如果当前任务已经执行完成,就直接调用监听器的方法 if (isDone()) { notifyListeners(); } return this; } ``` 继续跟进 ```java private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); } } ``` 我们看到最后就是将我们添加的监听器保存在一个成员变量listeners中 ## 5.2 PromiseTask 在这个类中,我们终于看到了对于run方法的实现。 ```java @Override public void run() { try { if (setUncancellableInternal()) { //调用我们的任务 V result = runTask(); //设置执行成功状态,并将返回值保存 setSuccessInternal(result); } } catch (Throwable e) { setFailureInternal(e); } } @SuppressWarnings("unchecked") V runTask() throws Throwable { //调用我们代码 final Object task = this.task; if (task instanceof Callable) { return ((Callable<V>) task).call(); } ((Runnable) task).run(); return null; } ``` 这里我们就可以看到我们自己的代码在runTask中被执行,而在执行后,还调用了setSuccessInternal方法,一直跟进到DefaultPromise中,我们看到: ```java @Override public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { return this; } throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); } private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { notifyListeners(); } return true; } return false; } ``` 在上面代码中,我们看到做了两个事情, 1、设置任务执行成功状态,并保存结果。 2、如果有监听器,调用notifyListeners 方法,调用监听事件。
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号