Netty 4在异步代码中大量涉及到Future、Promise和Listener这三类接口。在这主要梳理一下这三者的大致概念。
- Future
和Channel中类似,Netty在JDK中Future基础上进行扩充时,直接采用了相同的接口名称。Netty中Future在JDK的基础上增加了成功失败状态(失败时提供异常的获取)、Listener以及await的概念。整体与JDK中语义类似,维持Future的只读属性。
-
Promise
Netty在#868和#873中参考Scala中Future和Promise的概念,将原有的ChannelFuture拆分出ChannelPromise作为可写接口,并在#1065中进一步将一般的Future/Promise接口抽象出来。
现有Promise接口主要提供Future的成功失败以及取消的设置操作。
-
Listener
Netty中Listener未像Scala中区分具体的成功失败等操作,仅提供了Future终态时的操作(operationComplete),这点与Guava中ListenableFuture一致。为性能方面考虑,Netty推荐采用Listener代替get操作Future。
上述接口主要位于io.netty.util.concurrent
包中,可以作为一般的类库进行使用。
需要注意的是,在DefaultPromise的实现中,Netty并未像JDK中一样引入state进行状态标示,而是通过result的属性类型进行区分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
...
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result"); private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS"); private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE"); private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace( new CancellationException(), DefaultPromise.class, "cancel(...)"));
private volatile Object result;
...
@Override public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this); }
...
@Override public Promise<V> setFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this, cause); }
...
@Override public boolean setUncancellable() { if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) { return true; } Object result = this.result; return !isDone0(result) || !isCancelled0(result); }
...
private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); }
private boolean setFailure0(Throwable cause) { return setValue0(new CauseHolder(checkNotNull(cause, "cause"))); }
...
private static boolean isCancelled0(Object result) { return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; }
private static boolean isDone0(Object result) { return result != null && result != UNCANCELLABLE; }
private static final class CauseHolder { final Throwable cause; CauseHolder(Throwable cause) { this.cause = cause; } }
...
|
在Netty网络编程中则采用io.netty.channel
中的子类接口和实现,类似ChannelFuture和ChannelPromise等,这些接口和类提供了与Channel的关联操作。
其中DefaultChannelPromise为ChannelPromise的默认实现类,不过具体使用中推荐通过Channel.newPromise()
来调用。AbstractChannelHandlerContext在读写代码中会默认生成Promise对象兼容内部的异步操作,考虑到很多情况,例如write并不需要关心具体回调结果,Netty提供了空操作VoidChannelPromise类以减少类的生成(通过Channel.voidPromise()
调用)。不过VoidChannelPromise在使用时需要**格外小心**:
Only do this if you are not interested in the ChannelFuture, and no ChannelOutboundHandler needs to add ChannelFutureListener to it!