政府門戶網站什么意思搜索引擎優(yōu)化seo優(yōu)惠
文章目錄
- 前言
- 一、Promise 接口
- 二、Netty 的 DefaultPromise
- 2.1、設置任務的成功或失敗
- 2.2、獲取 Future 任務執(zhí)行結果和添加監(jiān)聽事件
- 三、Netty 的 DefaultChannelPromise
- 總結
前言
回顧Netty系列文章:
- Netty 概述(一)
- Netty 架構設計(二)
- Netty Channel 概述(三)
- Netty ChannelHandler(四)
- ChannelPipeline源碼分析(五)
- 字節(jié)緩沖區(qū) ByteBuf (六)(上)
- 字節(jié)緩沖區(qū) ByteBuf(七)(下)
- Netty 如何實現零拷貝(八)
- Netty 程序引導類(九)
- Reactor 模型(十)
- 工作原理詳解(十一)
- Netty 解碼器(十二)
- Netty 編碼器(十三)
- Netty 編解碼器(十四)
- 自定義解碼器、編碼器、編解碼器(十五)
- Future 源碼分析(十六)
本篇文章我就就來分析一下可寫的 Future,也就是 promise,Netty 中的 Promise 擴展自 Netty 的 Future。
一、Promise 接口
在 Netty 中,Promise 接口是一種特殊的可寫的 Future。 Promise 的核心源碼如下:
public interface Promise<V> extends Future<V> {Promise<V> setSuccess(V var1);boolean trySuccess(V var1);Promise<V> setFailure(Throwable var1);boolean tryFailure(Throwable var1);boolean setUncancellable();Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1);Promise<V> await() throws InterruptedException;Promise<V> awaitUninterruptibly();Promise<V> sync() throws InterruptedException;Promise<V> syncUninterruptibly();
}
從上面可以看出,Promise 就是一個可寫的 Future。在 Future 機制中,業(yè)務邏輯所在任務執(zhí)行的狀態(tài)(成功或失敗)是在 Future 中實現的;而在 Promise 中,可以在業(yè)務邏輯中控制任務的執(zhí)行結果,相比 Future 更加靈活。
以下是一個 Promise 的示例(偽代碼)。
//異步的耗時任務接收一個 Promise
public Promise asynchronousFunction() {Promise promise = new PromiseImpl();Object result = null;return =search() //業(yè)務邏輯if (sucess) {promise.setSuccess(result); //通知 promise 當前異步任務成功了,并傳入結果} else if (failed) {promise.setFailure(reason);//通知 promise 當前異步任務失敗了} else if (error) {promise.setFailure(error);//通知 promise 當前異步任務發(fā)生了異常}
}//調用異步的耗時操作
Promise promise = asynchronousFunction(promise);//會立即返回 promise//添加成功處理 / 失敗處理 / 異步處理等事件
promise.addListener();//例如:可以添加成功后的執(zhí)行事件//繼續(xù)做其他事件,不需要理會 asynchronousFunction 何時結束
doOtherThings();
在 Netty 中,Promise 繼承了 Future,因此也具備了 Future 的所有功能。在 Promise 機制中,可以在業(yè)務邏輯中人工設置業(yè)務邏輯的成功與失敗。
Netty 的常用 Promise 類有 DefaultPromise 類,這是 Promise 實現的基礎,DefaultChannelPromise 是 DefaultPromise 的子類,加入了channel屬性。
二、Netty 的 DefaultPromise
Netty 中涉及異步操作的地方都使用了 Promise 。例如,下面是服務器/客戶端啟動時的注冊任務,最終會調用 Unsafe 的 register,調用過程中會傳入一個Promise 。Unsafe 進行事件的注冊時調用 Promise 可以設置成功或者失敗。
//SingleThreadEventLoop.java
public ChannelFuture register(ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;
}//AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");if (AbstractChannel.this.isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));} else if (!AbstractChannel.this.isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));} else {AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {this.register0(promise);} else {try {eventLoop.execute(new Runnable() {public void run() {AbstractUnsafe.this.register0(promise);}});} catch (Throwable var4) {AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);this.closeForcibly();AbstractChannel.this.closeFuture.setClosed();this.safeSetFailure(promise, var4);}}}
}
DefaultPromise 提供的功能可以分為兩個部分;一個是為調用者提供 get()和addListen()用于獲取 Future 任務執(zhí)行結果和添加監(jiān)聽事件;另一部分是為業(yè)務處理任務提供setSucess()等方法設置任務的成功或失敗。
2.1、設置任務的成功或失敗
DefaultPromise 核心源碼如下:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {public Promise<V> setSuccess(V result) {if (this.setSuccess0(result)) {return this;} else {throw new IllegalStateException("complete already: " + this);}}public boolean trySuccess(V result) {return this.setSuccess0(result);}public Promise<V> setFailure(Throwable cause) {if (this.setFailure0(cause)) {return this;} else {throw new IllegalStateException("complete already: " + this, cause);}}public boolean tryFailure(Throwable cause) {return this.setFailure0(cause);}public boolean setUncancellable() {if (RESULT_UPDATER.compareAndSet(this, (Object)null, UNCANCELLABLE)) {return true;} else {Object result = this.result;return !isDone0(result) || !isCancelled0(result);}}public boolean isSuccess() {Object result = this.result;return result != null && result != UNCANCELLABLE && !(result instanceof DefaultPromise.CauseHolder);}public boolean isCancellable() {return this.result == null;}//...}
2.2、獲取 Future 任務執(zhí)行結果和添加監(jiān)聽事件
DefaultPromise 的get方法有 3 個。
無參數的get會阻塞等待;
有參數的get會等待指定事件,若未結束就拋出超時異常,這兩個get是在其父類 AbstractFuture中實現的。getNow()方法則會立馬返回結果。
源碼如下:
public V getNow() {Object result = this.result;return !(result instanceof DefaultPromise.CauseHolder) && result != SUCCESS && result != UNCANCELLABLE ? result : null;
}public V get() throws InterruptedException, ExecutionException {Object result = this.result;if (!isDone0(result)) {this.await();result = this.result;}if (result != SUCCESS && result != UNCANCELLABLE) {Throwable cause = this.cause0(result);if (cause == null) {return result;} else if (cause instanceof CancellationException) {throw (CancellationException)cause;} else {throw new ExecutionException(cause);}} else {return null;}
}public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {Object result = this.result;if (!isDone0(result)) {if (!this.await(timeout, unit)) {throw new TimeoutException();}result = this.result;}if (result != SUCCESS && result != UNCANCELLABLE) {Throwable cause = this.cause0(result);if (cause == null) {return result;} else if (cause instanceof CancellationException) {throw (CancellationException)cause;} else {throw new ExecutionException(cause);}} else {return null;}
}
await() 方法判斷 Future 任務是否結束,之后獲取 this 鎖,如果任務未完成則調用 Object 的 wait()等待。源碼如下:
public Promise<V> await() throws InterruptedException { if (this.isDone()) {return this;} else if (Thread.interrupted()) {throw new InterruptedException(this.toString());} else {this.checkDeadLock();synchronized(this) {while(!this.isDone()) {this.incWaiters();try {this.wait();} finally {this.decWaiters();}}return this;}}//...
}
addListener 方法被調用時,將傳入的回調傳入listeners對象中。如果監(jiān)聽多于 1 個,會創(chuàng)建DeflaultFutureListeners對象將回調方法保存在一個數組中。
removeListener會將listeners設置為null(只有一個時)或從數組中移除(多個回調時)。源碼如下。
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {ObjectUtil.checkNotNull(listener, "listener");synchronized(this) {this.addListener0(listener);}if (this.isDone()) {this.notifyListeners();}return this;
} public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {ObjectUtil.checkNotNull(listeners, "listeners");synchronized(this) {GenericFutureListener[] var3 = listeners;int var4 = listeners.length;int var5 = 0;while(var5 < var4) {GenericFutureListener<? extends Future<? super V>> listener = var3[var5];if (listener != null) {this.addListener0(listener);++var5;continue;}}}if (this.isDone()) {this.notifyListeners();}return this;
}public Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {ObjectUtil.checkNotNull(listener, "listener");synchronized(this) {this.removeListener0(listener);return this;}
}public Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {ObjectUtil.checkNotNull(listeners, "listeners");synchronized(this) {GenericFutureListener[] var3 = listeners;int var4 = listeners.length;for(int var5 = 0; var5 < var4; ++var5) {GenericFutureListener<? extends Future<? super V>> listener = var3[var5];if (listener == null) {break;}this.removeListener0(listener);}return this;}
}
在添加監(jiān)聽器的過程中,如果任務剛好執(zhí)行完畢 done(),則立即觸發(fā)監(jiān)聽事件。觸發(fā)監(jiān)聽通過notifyListeners()實現。主要邏輯如下:
如果當前addListener的線程(準確來說應該是調用了notifyListeners的線程,因為addListener和setSuccess都會調用notifyListeners和 Promise 內的線程池)與當前執(zhí)行的線程是同一個線程,則放在線程池中執(zhí)行,否則提交到線程池中執(zhí)行;
而如果是執(zhí)行 Future 任務的線程池中的setSuccess時,調用notifyListeners(),會放在當前線程中執(zhí)行。內部維護了notifyListeners用來記錄是否已經觸發(fā)過監(jiān)聽事件,只有未觸發(fā)過且監(jiān)聽列表不為空,才會依次遍歷并調用operationComplete。
三、Netty 的 DefaultChannelPromise
DefaultChannelPromise 是 DefaultPromise 的子類,內部維護了一個通道變量 channel。
Promise 機制相關的方法都是調用父類方法。
除此之外,DefaultChannelPromise 還實現了FlushCheckpoint接口,供ChannelFlushPromiseNotifier使用,可以將ChannelFuture注冊到ChannelFlushPromiseNotifier類,當有數據寫入或到達checkpoint時使用。
核心源碼如下:
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {private final Channel channel;private long checkpoint;//...public Channel channel() {return this.channel;}public ChannelPromise setSuccess() {return this.setSuccess((Void)null);}public ChannelPromise setSuccess(Void result) {super.setSuccess(result);return this;}public boolean trySuccess() {return this.trySuccess((Object)null);}public ChannelPromise setFailure(Throwable cause) {super.setFailure(cause);return this;}//...public ChannelPromise promise() {return this;}protected void checkDeadLock() {if (this.channel().isRegistered()) {super.checkDeadLock();}}public ChannelPromise unvoid() {return this;}public boolean isVoid() {return false;}
}
總結
以上我們分析了 Netty 中的 Promise,知道了它是擴展自 Netty 的 Future,是一個可寫的 Future。