工會(huì)網(wǎng)站建設(shè)管理工作總結(jié)軟文營(yíng)銷(xiāo)范文
目標(biāo)
- 服務(wù)器啟動(dòng)后,客戶端進(jìn)行連接,服務(wù)器端此時(shí)要接受客戶端請(qǐng)求,并且返回給客戶端想要的請(qǐng)求,下面我們的目標(biāo)就是分析Netty 服務(wù)器端啟動(dòng)后是怎么接受到客戶端請(qǐng)求的。
- 我們的代碼依然與上一篇中用同一個(gè)demo, 用io.netty.example下的echo包下的代碼
- 我們直接debug模式啟動(dòng)Server端,讓后在瀏覽器輸入Http://localhost:8007,接著以下代碼分析
源碼剖析
- 在上一篇文章Netty啟動(dòng)過(guò)程源碼分析中,我們知道了服務(wù)器最終注冊(cè) 一個(gè)Accept事件等待客戶端的連接,同時(shí)將NioServerSocketChannel注冊(cè)到boss單例線程池中,也就是EventLoop如上圖左邊黃色區(qū)域部分
- 因此我們想要分析接受client連接的代碼,先找到對(duì)應(yīng)的EventLoop源碼,如上圖中NioEventLoop 循環(huán),找到如下源碼
//代碼位置 NioEventLoop --- > run()
@Overrideprotected void run() {int selectCnt = 0;for (;;) {try {int strategy;try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {.......// 處理各種strategy類(lèi)型default:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}selectCnt++;cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;boolean ranTasks;if (ioRatio == 100) {try {if (strategy > 0) {//對(duì)strategy事件進(jìn)行處理processSelectedKeys();}} finally {ranTasks = runAllTasks();}} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {.......}} else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks}.......} catch (CancelledKeyException e) {.......} catch (Throwable t) {handleLoopException(t);}.......}}
- 如上代碼中 strategy 更具請(qǐng)求的類(lèi)型走不同的策略,最后處理策略的方法是 processSelectedKeys();,我們繼續(xù)根核心方法 processSelectedKeys();,如下源碼
//進(jìn)入processSelectedKeys ---》processSelectedKeysOptimized(); ---〉processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {return;}if (eventLoop == this) {unsafe.close(unsafe.voidPromise());}return;}try {int readyOps = k.readyOps();if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}
- 第一個(gè)if中最事件合法性驗(yàn)證,接著獲取readyOps,我們debug得到是16,如下圖
- 找到SelectionKey中16 代碼的意義
/*** Operation-set bit for socket-accept operations.** <p> Suppose that a selection key's interest set contains* <tt>OP_ACCEPT</tt> at the start of a <a* href="Selector.html#selop">selection operation</a>. If the selector* detects that the corresponding server-socket channel is ready to accept* another connection, or has an error pending, then it will add* <tt>OP_ACCEPT</tt> to the key's ready set and add the key to its* selected-key set. </p>*/public static final int OP_ACCEPT = 1 << 4;
- 術(shù)語(yǔ)連接請(qǐng)求,這就是我們拿到了之前用Http://localhost:8007 請(qǐng)求的連接,接著繼續(xù)跟進(jìn)代碼 EventLoopGroup —> processSelectedKey —> unsafe.read(); 其中unsafe是NioMessageUnsafed,上一篇中有過(guò)分析用來(lái)處理消息接收
- 繼續(xù)跟進(jìn)AbstractNioMessageChannel —> read() ,得到如下源碼,刪了一些對(duì)本次無(wú)關(guān)的一些代碼,如下
public void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {int localRead = doReadMessages(readBuf);......allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}......if (exception != null) {......}if (closed) {......}} finally {......}}}
-
assert eventLoop().inEventLoop(); 判斷改eventLoop線程是否當(dāng)前線程
-
ChannelConfig config = config(); 獲取NioServerSocketChannelConfig
-
ChannelPipeline pipeline = pipeline(); 獲取DefaultChannelPipeline。他是一個(gè)雙向鏈表,可以看到內(nèi)部包含 LoggingHandler,ServerBootStraptHandler
-
繼續(xù)跟進(jìn) NioServersocketChannel —> doMessage(buf),可以進(jìn)入到NioServerSocketChannel,找到doMessage方法
protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}
-
參數(shù)buf是一個(gè)靜態(tài)隊(duì)列。private final List readBuf = new ArrayList(); 讀取boss線程中的NioServerSocketChannel接受到的請(qǐng)求,并且將請(qǐng)求放到buf容器中
-
SocketChannel ch = SocketUtils.accept(javaChannel()); 通過(guò)Nio中工具類(lèi)的建立連接,其實(shí)底層是調(diào)用了ServerSocketChannelImpl —> accept()方法建立TCP連接,并返回一個(gè)Nio中的SocketChannel
-
buf.add(new NioSocketChannel(this, ch)); 將獲取到的Nio中SocketCHannel包裝成Netty中的NioSocketChannel 并且添加到buf隊(duì)列中(list)
-
doReadMessages到這分析完。
-
我們回到回到EventLoopGroup —> ProcessSelectedKey
-
循環(huán)遍歷之前doReadMessage中獲取的buf中的所有請(qǐng)求,調(diào)用Pipeline的firstChannelRead方法,用于處理這些接受的請(qǐng)求或者其他事件,在read方法中,循環(huán)調(diào)用ServerSocket的Pipeline的fireChannelRead方法,開(kāi)始執(zhí)行管道中的handler的ChannelRead方法,如下
- 繼續(xù)跟進(jìn),進(jìn)入 pipeline.fireChannelRead(readBuf.get(i)); 一直跟到AbstracChannelHandlerContext —> invokeChannelRead
private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRead(msg);}}
- 進(jìn)入 handler() 中,DefaultChannelPipeline —> handler()
- debug源碼可以看到,在管道中添加了多個(gè)Handler,分別是:HeadContext,LoggingContext,ServerBootStrapAcceptor,TailContext 因此debug時(shí)候會(huì)依次進(jìn)入每一個(gè)Handler中。我們重點(diǎn)看ServerBootStrapAcceptor中的channelRead方法
@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}
- 因?yàn)閰?shù)msg是NioSocketChannel,此處強(qiáng)制轉(zhuǎn)成channel,
- child.pipeline().addLast(childHandler); 將我們?cè)趍ain方法中設(shè)置的EchoServerHandler添加到pipeline的handler鏈表中
- setChannelOptions 對(duì)TCP參數(shù)賦值
- setAttributes 設(shè)置各種屬性
- childGroup.register(child).addListener(…) 將NioSocketChannel注冊(cè)到 NioEventLoopGroup中的一個(gè)EventLoop中,并且添加一個(gè)監(jiān)聽(tīng)器
- 以上NioEventLoopGroup就是我們main方法創(chuàng)建的數(shù)組workerGroup
- 進(jìn)入register方法, MultithreadEventLoopGroup —>register , SingleThreadEventLoop —>register , AbstractChannel —> register,如下
- 首先看MultithreadEventLoopGroup中的register
@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}
- 進(jìn)入next()方法中,最終我們可以追到 DefaultEventExecutorChooserFactory — > PowerOfTwoEventExecutorChooser — > next() 內(nèi)部類(lèi)中的next
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}}
-
入上我們通過(guò)debug可以看到,next返回的就是我們?cè)趙orkerGroup中創(chuàng)建的線程數(shù)組中的某一個(gè)子線程EventExecutor
-
接下來(lái)我們?cè)诨氐絩egister方法: AbstractChannel —> register 方法,如下:
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {......AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {......}}}
- 關(guān)鍵方法register0
private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
- 進(jìn)入 doRegister(); 方法:AbstractNioChannel —> doRegister
@Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}
- 上代碼,selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);此處我們將bossGroup中的EventLoop的channel 注冊(cè)到workerGroup中的EventLoop中的 select中,方法中會(huì)得到一個(gè)selectionKey
- 我們可以看register方法的注視,如下:
Registers this channel with the given selector, returning a selectionkey.
使用給定的選擇器注冊(cè)此通道,并返回選擇鍵。
- 接著debug,最終會(huì)到 AbstractNioChannel 中的doBeginRead方法
@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}
- 此方法比較難進(jìn)入,包含了幾個(gè)異步,將之前的斷電去掉,再次http請(qǐng)求,可以到這個(gè)方法中
- 追到這里,針對(duì)客戶的連接已經(jīng)完成,接下來(lái)是讀取監(jiān)聽(tīng)事件,也就是bossGroup的連接建立,注冊(cè)步驟已近完成了,接下來(lái)就是workerGroup中的事件處理了
Netty接收請(qǐng)求過(guò)程梳理
-
總流程:接收連接 — 》創(chuàng)建一個(gè)新的NioSocketChannel —〉 注冊(cè)到一個(gè)WorkerEventLoop上 —》 注冊(cè)selecotRead事件
- 服務(wù)器淪陷Accept事件(文中最開(kāi)始的那個(gè)for循環(huán)),獲取事件后調(diào)用unsafe的read方法,這個(gè)unsafe是ServerSocket的內(nèi)部類(lèi),改方法內(nèi)部由2部分組成
- doReadMessage 用于創(chuàng)建NioSocketChannel對(duì)象,改對(duì)象包裝JDK的NioChannel客戶端,該方法創(chuàng)建一個(gè)ServerSocketChannel
- 之后執(zhí)行pipeline.firstChannelRead方法,并且將自己綁定到一個(gè)chooser選擇器選擇的workerGroup中的某個(gè)EventLoop上,并且注冊(cè)一個(gè)0(連接),表示注冊(cè)成功,但是并沒(méi)有注冊(cè)1 (讀取)
-
上一篇:Netty啟動(dòng)流程源碼剖析