带有 listenable 回调功能的 guava 线程池是
com.google.common.util.concurrent
包里十分重要的概念, 它们实现了任务执行完异步回调指定逻辑的功能, 在很大程度上解决了 java 原生组件 Future / FutureTask 阻塞获取结果的尴尬, 在生产实践中有着广泛的应用;
类族相关成员列举
guava 中与 ListeningExecutorService 相关的类都集中在 util.concurrent 包中, 主要分为三类:
(1) 包装返回 ListenableFutureTask 的 ExecutorService:
1 | ListeningExecutorService extends ExecutorService; |
(2) 与 ListenableFutureTask 相关的类, 实现异步回调的关键逻辑:
1 | ListenableFuture extends Future; |
(3) 便捷工具类, 主要是方便开发者以友好的方式使用 ListeningExecutorService 和 ListenableFutureTask:
1 | MoreExecutors; |
ListenableFutureTask 的异步回调原理
java 原生组件的关键支持
guava ListenableFuture 得以实现任务完成后异步回调指定逻辑的关键就在于 java.util.concurrent.FutureTask 留白了一个空方法:
1 | /** |
可以发现, 注释中说明了该方法将留给子类去重写以实现 “invoke completion callbacks”;
下面来看下这个空方法是如何被回调的:
(1) FutureTask 的 run 方法, 当任务跑完后会根据结果调用 set / setException 方法更新 state 状态;
1 | public void run() { |
(2) 以 set 方法为例, 更新完状态后会调用 finishCompletion() 方法;
1 | protected void set(V v) { |
另外除了 set 与 setException 方法之外, 还有 cancel(boolean mayInterruptIfRunning) 方法也回调了 finishCompletion() 方法;
(3) 在 finishCompletion() 方法中, 回调了留白的 done() 方法;
1 | private void finishCompletion() { |
ListenableFutureTask 正是继承了 FutureTask 并重写了 done() 方法, 实现了异步回调指定逻辑的功能;
ListenableFutureTask 的具体实现
在 ListenableFutureTask 中对 done() 方法的实现是这样的:
1 | /** |
其中, 类成员 executionList 的 execute() 方法逻辑如下:
1 | public void execute() { |
1 | /* |
其中, RunnableExecutorPair 是个链表节点, 存储了待执行的回调任务及执行任务的 executor; ExecutionList.execute 方法的内容就是将链表中的每个任务按照 原始入队的顺序 遍历执行;
所谓入队, 其实就是指我们得到一个 ListenableFuture 实例后为其添加的回调逻辑, 通常我们会调用 addListener(Runnable listener, Executor executor) 方法以实现异步回调;
而这里所说的 “原始入队的顺序”, 便是指 ListenableFuture 调用 addListener 方法添加回调任务的顺序;
ListenableFutureTask 实现的 addListener 方法是调用 executionList.add 方法:
1 | /* ExecutionList */ |
可以发现, add 方法是将新的 RunnableExecutorPair 插在了链表头上, 使得遍历链表的顺序与插入顺序相反, 所以 execute 方法中需要先反转链表才能执行;
以上内容便是 ListenableFutureTask 实现异步回调的基本原理;
guava 顺手实现的一些便捷工具类
虽然上文已描述了 ListenableFutureTask 异步回调的原理, 但这离我们的实际使用仍然相距甚远, 我们并不会主动构造 ListenableFutureTask, 也很少直接调用一个 ListenableFuture 实例的 addListener 方法, 这些都太不方便了;
guava 基于 ListenableFuture 又编写了一系列的工具类, 这些工具类简化了我们使用 ListenableFuture 的方式, 在生产环境中被普遍使用;
MoreExecutors
首先是入口 MoreExecutors, 我们通常使用 listeningDecorator 方法构造一个能够生产 ListenableFutureTask 的 ListeningExecutorService 实例:
1 | public static ListeningExecutorService listeningDecorator(ExecutorService delegate) { |
重点看最后一行, 这是我们代码里经常走到的一行逻辑: ListeningDecorator 使用了一个装饰器, 修饰了 ExecutorService 中一些重要的方法:
1 | private static class ListeningDecorator extends AbstractListeningExecutorService; |
ListeningDecorator 本身没有什么特殊的地方, 关键看它的父类 AbstractListeningExecutorService:
1 |
|
1 | /* ListenableFutureTask */ |
所以说, 这里就是生产 ListenableFutureTask 的地方了, MoreExecutors.listeningDecorator 返回的实例将被这些方法包装, 以能够构造出合适的 ListenableFutureTask 实例;
Futures
能够以友好方式构造 ListenableFutureTask 其实是不够的, 如果我们要主动调用其 addListener 方法, 就得自己处理回调任务中的各种异常, 类似下面这种模式:
1 | listenableFuture.addListener(() -> { |
很明显, 不是非常友好, 这种固化的逻辑完全是可以抽出来的, 于是 guava 提供了 Futures 类, 其中有一个方法 addCallback:
1 | public static <V> void addCallback(ListenableFuture<V> future, FutureCallback<? super V> callback) { |
1 | public static <V> void addCallback(final ListenableFuture<V> future, |
这段代码有两点:
(1) 使用 MoreExecutors.sameThreadExecutor() 构造执行回调的 executor;
MoreExecutors.sameThreadExecutor() 返回了一个自定义的 SameThreadExecutorService 实例, 这个类的特点是单线程, 回调任务都放在 executor.execute 所在线程里处理; 这样做十分得轻量化, 而如果使用 jdk 的 ThreadPoolExecutor, 很多时候都是为一个方法的执行付出创建一个线程池的开销;
(2) 抽象出一个 FutureCallback 留给使用者实现回调的特定逻辑, 其余的 future.get(), 异常处理等都封装到 addCallback 方法里了, 对于 addCallback 方法里遇到异常或是执行成功, 都只是回调 FutureCallback 的接口而已:
1 | public interface FutureCallback<V> { |
如此一来, 我们为 ListenableFutureTask 添加回调的方法就简洁多了:
1 | Futures.addCallback(listenableFuture, new FutureCallback<String>() { |