Netty源代码-Future和Promise

Netty 4在异步代码中大量涉及到Future、Promise和Listener这三类接口。在这主要梳理一下这三者的大致概念。

  • Future
    和Channel中类似,Netty在JDK中Future基础上进行扩充时,直接采用了相同的接口名称。Netty中Future在JDK的基础上增加了成功失败状态(失败时提供异常的获取)、Listener以及await的概念。整体与JDK中语义类似,维持Future的只读属性。
  • Promise
    Netty在#868#873中参考Scala中Future和Promise的概念,将原有的ChannelFuture拆分出ChannelPromise作为可写接口,并在#1065中进一步将一般的Future/Promise接口抽象出来。
    现有Promise接口主要提供Future的成功失败以及取消的设置操作。

  • Listener
    Netty中Listener未像Scala中区分具体的成功失败等操作,仅提供了Future终态时的操作(operationComplete),这点与Guava中ListenableFuture一致。为性能方面考虑,Netty推荐采用Listener代替get操作Future。

上述接口主要位于io.netty.util.concurrent包中,可以作为一般的类库进行使用。

需要注意的是,在DefaultPromise的实现中,Netty并未像JDK中一样引入state进行状态标示,而是通过result的属性类型进行区分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

...

private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");
private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));

private volatile Object result;

...

@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}

...

@Override
public Promise<V> setFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this, cause);
}

...

@Override
public boolean setUncancellable() {
if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
return true;
}
Object result = this.result;
return !isDone0(result) || !isCancelled0(result);
}

...

private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}

private boolean setFailure0(Throwable cause) {
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}

...

private static boolean isCancelled0(Object result) {
return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
}

private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}

private static final class CauseHolder {
final Throwable cause;
CauseHolder(Throwable cause) {
this.cause = cause;
}
}

...

在Netty网络编程中则采用io.netty.channel中的子类接口和实现,类似ChannelFuture和ChannelPromise等,这些接口和类提供了与Channel的关联操作。

其中DefaultChannelPromise为ChannelPromise的默认实现类,不过具体使用中推荐通过Channel.newPromise()来调用。AbstractChannelHandlerContext在读写代码中会默认生成Promise对象兼容内部的异步操作,考虑到很多情况,例如write并不需要关心具体回调结果,Netty提供了空操作VoidChannelPromise类以减少类的生成(通过Channel.voidPromise()调用)。不过VoidChannelPromise在使用时需要**格外小心**:

Only do this if you are not interested in the ChannelFuture, and no ChannelOutboundHandler needs to add ChannelFutureListener to it!